package main import ( "custom/services/config" "custom/services/config/static" "custom/util/logging" "fmt" "net/http" "net/http/httputil" "net/url" "time" "github.com/cloudfoundry/storeadapter" ) var time_AfterFunc = time.AfterFunc // For testing injection. // Proxy is the server proxy. It can dynamically add/remove backends as they // come and go from the configuration. When any particular backend fails, it is // quarantined for an amount of time relative to the number of consecutive // failures it has had since it's configuration was updated. type Proxy struct { // knownBackends stores the definitive list of registered backends. knownBackends map[string]*backend // Requests a backend to service a proxy request. get chan *backend // Releases a backend from servicing a proxy request. // This includes the response code for border control inspection. finished chan backendWithResponseCode // Internal channel that indicates quarantine has been lifted. retry chan *backend shutdown chan bool // for tests } // backend holds the status and configuration for a single backend. type backend struct { url string handler http.Handler numConnections int // number of active connections right now. failures int // number of consecutive failures quarantined bool } type backendWithResponseCode struct { *backend code int } // NewProxy creates a new Proxy and returns it. This does NOT start the // management goroutine since initial configuration may be loaded before that // fires off. func NewProxy() *Proxy { return &Proxy{ knownBackends: make(map[string]*backend), get: make(chan *backend), finished: make(chan backendWithResponseCode), retry: make(chan *backend), } } // Proxy is the HTTP handler to proxy execution requests. func (p *Proxy) Proxy(w http.ResponseWriter, r *http.Request) { b := <-p.get wc := ResponseCodeCatcher{w, 0} b.handler.ServeHTTP(&wc, r) p.finished <- backendWithResponseCode{b, wc.code} } // ManageBackends is the background goroutine that manages the backend state. func (p *Proxy) ManageBackends(updates <-chan storeadapter.WatchEvent) { // This goroutine uses the updates channel and the internal get & finished // channels to communicate with the outside world. // // A tricky bit is how we control 'get' requests to always return the // minimally-loaded backend. The reason that this is non-obvious is // because only want to try to send on the get channel if we have a // backend available. So we use the intermediate execute channel and // toggle that between p.get and nil accordingly. var execute chan<- *backend = nil var next *backend for { // Choose the next backend to return and set execute accordingly. next = p.selectBackend() if next == nil { execute = nil } else { execute = p.get } // Block until one of the following channel operations can proceed: select { case <-p.shutdown: return case execute <- next: next.numConnections++ case result := <-p.finished: p.handleCompletion(result.backend, result.code) case backend := <-p.retry: backend.quarantined = false case update := <-updates: switch update.Type { case storeadapter.CreateEvent, storeadapter.UpdateEvent: url, err := config.GetUrlServiceConfig(static.NewServiceConfigFromJSON(string(update.Node.Value))) if err != nil { logging.Errorf("Failed to parse JSON payload: %s %v", update.Node.Value, err) } else { key := update.Node.Key if err := p.add(key, url.String()); err != nil { logging.Errorf("Failed to register backend %s (key %q)", url.String(), key) } } case storeadapter.DeleteEvent, storeadapter.ExpireEvent: p.remove(update.Node.Key) } } } } // selectBackend chooses the minimally-loaded, non-quaratined backend. If no // backends are available, this returns nil. func (p *Proxy) selectBackend() *backend { var min *backend for _, b := range p.knownBackends { if b.quarantined { continue } // Penalize backends that have had recent failures, even after they // come out of quarantine. if min == nil || (b.numConnections+b.failures) < min.numConnections { min = b } } return min } // handleCompletion processes backends completing func (p *Proxy) handleCompletion(b *backend, responseCode int) { b.numConnections-- if responseCode != 500 { // yay, it did something, reset the failure count. b.failures = 0 } else { // Kaboom! Take appropriate action. b.failures++ if !b.quarantined { b.quarantined = true failureRetryTime := time.Second * time.Duration(b.failures) logging.Errorf("Backend %s failed with %d (%d failures so far): quarantined for %v", b.url, responseCode, b.failures, failureRetryTime) time_AfterFunc(failureRetryTime, func() { p.retry <- b }) } } } // add adds a new backend to the internal list. If a backend already exists // for the specified key, it is modified only if the target url has changed. func (p *Proxy) add(key, target string) error { b, exists := p.knownBackends[key] if !exists || b.url != target { u, err := url.Parse(target) if err != nil { return fmt.Errorf("Failed to add backend %s (key: %s): %v", target, key, err) } p.knownBackends[key] = &backend{ url: target, handler: httputil.NewSingleHostReverseProxy(u), } action := "Registered" if exists { action = "Updated" } logging.Infof("%s backend %s (key: %s)", action, target, key) } return nil } // remove removes a backend from the internal list. If the specified key does // not exist, nothing is done. func (p *Proxy) remove(key string) { if b, exists := p.knownBackends[key]; exists { logging.Infof("Removing backend %s (key: %s)", b.url, key) delete(p.knownBackends, key) } } // ResponseCodeCatcher is a helper that records the status code of responses. type ResponseCodeCatcher struct { http.ResponseWriter code int } func (c *ResponseCodeCatcher) WriteHeader(code int) { c.code = code c.ResponseWriter.WriteHeader(code) }