Integrate circuit breaker modules with reverse proxy

This commit is contained in:
Matthew Holt 2019-09-03 19:06:54 -06:00
parent 652460e03e
commit acb8f0e0c2
No known key found for this signature in database
GPG Key ID: 2A349DD577D586A5
3 changed files with 36 additions and 2 deletions

View File

@ -64,6 +64,14 @@ type PassiveHealthChecks struct {
UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"` UnhealthyLatency caddy.Duration `json:"unhealthy_latency,omitempty"`
} }
// CircuitBreaker is a type that can act as an early-warning
// system for the health checker when backends are getting
// overloaded.
type CircuitBreaker interface {
OK() bool
RecordMetric(statusCode int, latency time.Duration)
}
// activeHealthChecker runs active health checks on a // activeHealthChecker runs active health checks on a
// regular basis and blocks until // regular basis and blocks until
// h.HealthChecks.Active.stopChan is closed. // h.HealthChecks.Active.stopChan is closed.
@ -202,7 +210,7 @@ func (h *Handler) doActiveHealthCheck(hostAddr string, host Host) error {
// remembers 1 failure for upstream for the configured // remembers 1 failure for upstream for the configured
// duration. If passive health checks are disabled or // duration. If passive health checks are disabled or
// failure expiry is 0, this is a no-op. // failure expiry is 0, this is a no-op.
func (h Handler) countFailure(upstream *Upstream) { func (h *Handler) countFailure(upstream *Upstream) {
// only count failures if passive health checking is enabled // only count failures if passive health checking is enabled
// and if failures are configured have a non-zero expiry // and if failures are configured have a non-zero expiry
if h.HealthChecks == nil || h.HealthChecks.Passive == nil { if h.HealthChecks == nil || h.HealthChecks.Passive == nil {

View File

@ -69,21 +69,29 @@ type Upstream struct {
healthCheckPolicy *PassiveHealthChecks healthCheckPolicy *PassiveHealthChecks
hostURL *url.URL hostURL *url.URL
cb CircuitBreaker
} }
// Available returns true if the remote host // Available returns true if the remote host
// is available to receive requests. // is available to receive requests. This is
// the method that should be used by selection
// policies, etc. to determine if a backend
// should be able to be sent a request.
func (u *Upstream) Available() bool { func (u *Upstream) Available() bool {
return u.Healthy() && !u.Full() return u.Healthy() && !u.Full()
} }
// Healthy returns true if the remote host // Healthy returns true if the remote host
// is currently known to be healthy or "up". // is currently known to be healthy or "up".
// It consults the circuit breaker, if any.
func (u *Upstream) Healthy() bool { func (u *Upstream) Healthy() bool {
healthy := !u.Host.Unhealthy() healthy := !u.Host.Unhealthy()
if healthy && u.healthCheckPolicy != nil { if healthy && u.healthCheckPolicy != nil {
healthy = u.Host.Fails() < u.healthCheckPolicy.MaxFails healthy = u.Host.Fails() < u.healthCheckPolicy.MaxFails
} }
if healthy && u.cb != nil {
healthy = u.cb.OK()
}
return healthy return healthy
} }

View File

@ -37,12 +37,14 @@ func init() {
// Handler implements a highly configurable and production-ready reverse proxy. // Handler implements a highly configurable and production-ready reverse proxy.
type Handler struct { type Handler struct {
TransportRaw json.RawMessage `json:"transport,omitempty"` TransportRaw json.RawMessage `json:"transport,omitempty"`
CBRaw json.RawMessage `json:"circuit_breaker,omitempty"`
LoadBalancing *LoadBalancing `json:"load_balancing,omitempty"` LoadBalancing *LoadBalancing `json:"load_balancing,omitempty"`
HealthChecks *HealthChecks `json:"health_checks,omitempty"` HealthChecks *HealthChecks `json:"health_checks,omitempty"`
Upstreams UpstreamPool `json:"upstreams,omitempty"` Upstreams UpstreamPool `json:"upstreams,omitempty"`
FlushInterval caddy.Duration `json:"flush_interval,omitempty"` FlushInterval caddy.Duration `json:"flush_interval,omitempty"`
Transport http.RoundTripper `json:"-"` Transport http.RoundTripper `json:"-"`
CB CircuitBreaker `json:"-"`
} }
// CaddyModule returns the Caddy module information. // CaddyModule returns the Caddy module information.
@ -55,6 +57,7 @@ func (Handler) CaddyModule() caddy.ModuleInfo {
// Provision ensures that h is set up properly before use. // Provision ensures that h is set up properly before use.
func (h *Handler) Provision(ctx caddy.Context) error { func (h *Handler) Provision(ctx caddy.Context) error {
// start by loading modules
if h.TransportRaw != nil { if h.TransportRaw != nil {
val, err := ctx.LoadModuleInline("protocol", "http.handlers.reverse_proxy.transport", h.TransportRaw) val, err := ctx.LoadModuleInline("protocol", "http.handlers.reverse_proxy.transport", h.TransportRaw)
if err != nil { if err != nil {
@ -73,6 +76,14 @@ func (h *Handler) Provision(ctx caddy.Context) error {
h.LoadBalancing.SelectionPolicy = val.(Selector) h.LoadBalancing.SelectionPolicy = val.(Selector)
h.LoadBalancing.SelectionPolicyRaw = nil // allow GC to deallocate - TODO: Does this help? h.LoadBalancing.SelectionPolicyRaw = nil // allow GC to deallocate - TODO: Does this help?
} }
if h.CBRaw != nil {
val, err := ctx.LoadModuleInline("type", "http.handlers.reverse_proxy.circuit_breakers", h.CBRaw)
if err != nil {
return fmt.Errorf("loading circuit breaker module: %s", err)
}
h.CB = val.(CircuitBreaker)
h.CBRaw = nil // allow GC to deallocate - TODO: Does this help?
}
if h.Transport == nil { if h.Transport == nil {
h.Transport = defaultTransport h.Transport = defaultTransport
@ -123,6 +134,8 @@ func (h *Handler) Provision(ctx caddy.Context) error {
} }
for _, upstream := range h.Upstreams { for _, upstream := range h.Upstreams {
upstream.cb = h.CB
// url parser requires a scheme // url parser requires a scheme
if !strings.Contains(upstream.Address, "://") { if !strings.Contains(upstream.Address, "://") {
upstream.Address = "http://" + upstream.Address upstream.Address = "http://" + upstream.Address
@ -307,6 +320,11 @@ func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, upstre
return err return err
} }
// update circuit breaker on current conditions
if upstream.cb != nil {
upstream.cb.RecordMetric(res.StatusCode, latency)
}
// perform passive health checks (if enabled) // perform passive health checks (if enabled)
if h.HealthChecks != nil && h.HealthChecks.Passive != nil { if h.HealthChecks != nil && h.HealthChecks.Passive != nil {
// strike if the status code matches one that is "bad" // strike if the status code matches one that is "bad"