From df90d9b2f3ab9a93950853bd71a9c2970e071caf Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Wed, 4 Mar 2020 18:19:18 +0800 Subject: [PATCH] add healthcheck router --- .../router/condition/default_health_check.go | 86 +++++++++++ .../condition/default_health_check_test.go | 135 ++++++++++++++++++ cluster/router/condition/factory.go | 11 ++ cluster/router/condition/factory_test.go | 5 + .../router/condition/health_check_route.go | 61 ++++++++ .../condition/health_check_route_test.go | 113 +++++++++++++++ cluster/router/health_checker.go | 9 ++ common/extension/health_checker.go | 21 +++ common/extension/health_checker_test.go | 32 +++++ protocol/rpc_status.go | 2 +- 10 files changed, 474 insertions(+), 1 deletion(-) create mode 100644 cluster/router/condition/default_health_check.go create mode 100644 cluster/router/condition/default_health_check_test.go create mode 100644 cluster/router/condition/health_check_route.go create mode 100644 cluster/router/condition/health_check_route_test.go create mode 100644 cluster/router/health_checker.go create mode 100644 common/extension/health_checker.go create mode 100644 common/extension/health_checker_test.go diff --git a/cluster/router/condition/default_health_check.go b/cluster/router/condition/default_health_check.go new file mode 100644 index 0000000000..f006d70f38 --- /dev/null +++ b/cluster/router/condition/default_health_check.go @@ -0,0 +1,86 @@ +package condition + +import ( + "math" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const ( + HEALTH_CHECKER = "health.checker" + DEFAULT_HEALTH_CHECKER = "default" + OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" + SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" + DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 + CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" + DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 + DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 + MAX_CIRCUIT_TRIPPED_TIMEOUT = 30000 +) + +func init() { + extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) +} + +// DefaultHealthChecker is the default +type DefaultHealthChecker struct { + OutStandingRequestConutLimit int32 + // the circuitbreaker threshold + RequestSuccessiveFailureThreshold int32 + CircuitTrippedTimeoutFactor int32 +} + +func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { + urlStatus := protocol.GetURLStatus(invoker.GetUrl()) + if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.OutStandingRequestConutLimit { + logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) + return false + } + return true +} +func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool { + circuitBreakerTimeout := c.getCircuitBreakerTimeout(status) + currentTime := protocol.CurrentTimeMillis() + if circuitBreakerTimeout <= 0 { + return false + } + return circuitBreakerTimeout > currentTime +} + +func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 { + sleepWindow := c.getCircuitBreakerSleepWindowTime(status) + if sleepWindow <= 0 { + return 0 + } + return status.GetLastRequestFailedTimestamp() + sleepWindow +} + +func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { + + successiveFailureCount := status.GetSuccessiveRequestFailureCount() + diff := successiveFailureCount - c.RequestSuccessiveFailureThreshold + if diff < 0 { + return 0 + } else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { + diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF + } + sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR + if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT { + sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT + } + return int64(sleepWindow) +} + +func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { + return &DefaultHealthChecker{ + OutStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), + RequestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), + CircuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), + } +} diff --git a/cluster/router/condition/default_health_check_test.go b/cluster/router/condition/default_health_check_test.go new file mode 100644 index 0000000000..f576a928af --- /dev/null +++ b/cluster/router/condition/default_health_check_test.go @@ -0,0 +1,135 @@ +package condition + +import ( + "math" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +func TestDefaultHealthChecker_IsHealthy(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + invoker := NewMockInvoker(url, 1) + healthy := hc.IsHealthy(invoker) + assert.True(t, healthy) + + url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") + // fake the outgoing request + for i := 0; i < 11; i++ { + request(url, "test", 0, true, false) + } + hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + healthy = hc.IsHealthy(invoker) + // the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy + assert.False(t, hc.IsHealthy(invoker)) + + // successive failed count is more than SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy + for i := 0; i < 11; i++ { + request(url, "test", 0, false, false) + } + url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + healthy = hc.IsHealthy(invoker) + assert.False(t, hc.IsHealthy(invoker)) + + // reset successive failed count and go to healthy + request(url, "test", 0, false, true) + healthy = hc.IsHealthy(invoker) + assert.True(t, hc.IsHealthy(invoker)) +} + +func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { + + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + // Increase the number of failed requests + for i := 0; i < 100; i++ { + request(url, "test", 1, false, false) + } + sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) + assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT) + + // Adjust the threshold size to 1000 + url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") + sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) + assert.True(t, sleepWindowTime == 0) + + url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) + assert.True(t, sleepWindowTime == 0) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) + assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT) + +} + +func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url)) + assert.True(t, timeout == 0) + url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1)) + // timeout must after the current time + assert.True(t, timeout > protocol.CurrentTimeMillis()) + +} + +func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + status := protocol.GetURLStatus(url) + tripped := defaultHc.isCircuitBreakerTripped(status) + assert.False(t, tripped) + // Increase the number of failed requests + for i := 0; i < 100; i++ { + request(url, "test", 1, false, false) + } + tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url)) + assert.True(t, tripped) + +} + +func TestNewDefaultHealthChecker(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + assert.NotNil(t, defaultHc) + assert.Equal(t, defaultHc.OutStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, defaultHc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) + + url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url1.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url1.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker) + assert.NotNil(t, nondefaultHc) + assert.Equal(t, nondefaultHc.OutStandingRequestConutLimit, int32(10)) + assert.Equal(t, nondefaultHc.RequestSuccessiveFailureThreshold, int32(10)) +} + +func request(url common.URL, method string, elapsed int64, active, succeeded bool) { + protocol.BeginCount(url, method) + if !active { + protocol.EndCount(url, method, elapsed, succeeded) + } +} diff --git a/cluster/router/condition/factory.go b/cluster/router/condition/factory.go index 66512a1387..0e3d40c2f4 100644 --- a/cluster/router/condition/factory.go +++ b/cluster/router/condition/factory.go @@ -57,3 +57,14 @@ func newAppRouterFactory() router.RouterFactory { func (c *AppRouterFactory) NewRouter(url *common.URL) (router.Router, error) { return NewAppRouter(url) } + +type HealthCheckRouteFactory struct { +} + +func newHealthCheckRouteFactory() router.RouterFactory { + return &HealthCheckRouteFactory{} +} + +func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) { + return NewHealthCheckRouter(url) +} diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index 99cec34096..97b4f7a408 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -366,3 +366,8 @@ func TestNewAppRouterFactory(t *testing.T) { factory := newAppRouterFactory() assert.NotNil(t, factory) } + +func TestHealthCheckRouteFactory(t *testing.T) { + factory := newHealthCheckRouteFactory() + assert.NotNil(t, factory) +} diff --git a/cluster/router/condition/health_check_route.go b/cluster/router/condition/health_check_route.go new file mode 100644 index 0000000000..6cdc9b6ebc --- /dev/null +++ b/cluster/router/condition/health_check_route.go @@ -0,0 +1,61 @@ +package condition + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const ( + HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" +) + +type HealthCheckRouter struct { + url *common.URL + enabled bool + checker router.HealthChecker +} + +func NewHealthCheckRouter(url *common.URL) (router.Router, error) { + r := &HealthCheckRouter{} + r.url = url + r.enabled = url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false) + if r.enabled { + checkerName := url.GetParam(HEALTH_CHECKER, DEFAULT_HEALTH_CHECKER) + r.checker = extension.GetHealthChecker(checkerName, url) + } + return r, nil +} + +func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if !r.enabled { + return invokers + } + var healthyInvokers []protocol.Invoker + for _, invoker := range invokers { + if r.checker.IsHealthy(invoker) { + healthyInvokers = append(healthyInvokers, invoker) + } + } + if len(healthyInvokers) == 0 { + logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey()) + return invokers + } else { + return healthyInvokers + } +} + +func (r *HealthCheckRouter) Priority() int64 { + return 0 +} + +// URL Return URL in router +func (r *HealthCheckRouter) URL() common.URL { + return *r.url +} + +func (r *HealthCheckRouter) HealthyChecker() router.HealthChecker { + return r.checker +} diff --git a/cluster/router/condition/health_check_route_test.go b/cluster/router/condition/health_check_route_test.go new file mode 100644 index 0000000000..23a11ab1b5 --- /dev/null +++ b/cluster/router/condition/health_check_route_test.go @@ -0,0 +1,113 @@ +package condition + +import ( + "math" + "testing" + "time" +) +import ( + "github.com/stretchr/testify/assert" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestHealthCheckRouter_Route(t *testing.T) { + consumerURL, _ := common.NewURL("dubbo://192.168.10.1/com.ikurento.user.UserProvider") + consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url2, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + url3, _ := common.NewURL("dubbo://192.168.10.12:20000/com.ikurento.user.UserProvider") + hcr, _ := NewHealthCheckRouter(&consumerURL) + + var invokers []protocol.Invoker + invoker1 := NewMockInvoker(url1, 1) + invoker2 := NewMockInvoker(url2, 1) + invoker3 := NewMockInvoker(url3, 1) + invokers = append(invokers, invoker1, invoker2, invoker3) + inv := invocation.NewRPCInvocation("test", nil, nil) + res := hcr.Route(invokers, &consumerURL, inv) + // now all invokers are healthy + assert.True(t, len(res) == len(invokers)) + + for i := 0; i < 10; i++ { + request(url1, "test", 0, false, false) + } + res = hcr.Route(invokers, &consumerURL, inv) + // invokers1 is unhealthy now + assert.True(t, len(res) == 2 && !contains(res, invoker1)) + + for i := 0; i < 10; i++ { + request(url1, "test", 0, false, false) + request(url2, "test", 0, false, false) + } + + res = hcr.Route(invokers, &consumerURL, inv) + // only invokers3 is healthy now + assert.True(t, len(res) == 1 && !contains(res, invoker1) && !contains(res, invoker2)) + + for i := 0; i < 10; i++ { + request(url1, "test", 0, false, false) + request(url2, "test", 0, false, false) + request(url3, "test", 0, false, false) + } + + res = hcr.Route(invokers, &consumerURL, inv) + // now all invokers are unhealthy, so downgraded to all + assert.True(t, len(res) == 3) + + // reset the invoker1 successive failed count, so invoker1 go to healthy + request(url1, "test", 0, false, true) + res = hcr.Route(invokers, &consumerURL, inv) + assert.True(t, contains(res, invoker1)) + + for i := 0; i < 6; i++ { + request(url1, "test", 0, false, false) + } + // now all invokers are unhealthy, so downgraded to all again + res = hcr.Route(invokers, &consumerURL, inv) + assert.True(t, len(res) == 3) + time.Sleep(time.Second * 2) + // invoker1 go to healthy again after 2s + res = hcr.Route(invokers, &consumerURL, inv) + assert.True(t, contains(res, invoker1)) + +} + +func contains(invokers []protocol.Invoker, invoker protocol.Invoker) bool { + for _, e := range invokers { + if e == invoker { + return true + } + } + return false +} + +func TestNewHealthCheckRouter(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + hcr, _ := NewHealthCheckRouter(&url) + h := hcr.(*HealthCheckRouter) + assert.Nil(t, h.checker) + + url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + hcr, _ = NewHealthCheckRouter(&url) + h = hcr.(*HealthCheckRouter) + assert.NotNil(t, h.checker) + + dhc := h.checker.(*DefaultHealthChecker) + assert.Equal(t, dhc.OutStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, dhc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) + assert.Equal(t, dhc.CircuitTrippedTimeoutFactor, int32(DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) + + url.SetParam(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") + url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + hcr, _ = NewHealthCheckRouter(&url) + h = hcr.(*HealthCheckRouter) + dhc = h.checker.(*DefaultHealthChecker) + assert.Equal(t, dhc.OutStandingRequestConutLimit, int32(1000)) + assert.Equal(t, dhc.RequestSuccessiveFailureThreshold, int32(10)) + assert.Equal(t, dhc.CircuitTrippedTimeoutFactor, int32(500)) +} diff --git a/cluster/router/health_checker.go b/cluster/router/health_checker.go new file mode 100644 index 0000000000..58a4cbff97 --- /dev/null +++ b/cluster/router/health_checker.go @@ -0,0 +1,9 @@ +package router + +import ( + "github.com/apache/dubbo-go/protocol" +) + +type HealthChecker interface { + IsHealthy(invoker protocol.Invoker) bool +} diff --git a/common/extension/health_checker.go b/common/extension/health_checker.go new file mode 100644 index 0000000000..67deb0cc37 --- /dev/null +++ b/common/extension/health_checker.go @@ -0,0 +1,21 @@ +package extension + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" +) + +var ( + healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker) +) + +func SethealthChecker(name string, fcn func(url *common.URL) router.HealthChecker) { + healthCheckers[name] = fcn +} + +func GetHealthChecker(name string, url *common.URL) router.HealthChecker { + if healthCheckers[name] == nil { + panic("healthCheckers for " + name + " is not existing, make sure you have import the package.") + } + return healthCheckers[name](url) +} diff --git a/common/extension/health_checker_test.go b/common/extension/health_checker_test.go new file mode 100644 index 0000000000..71547ce5b8 --- /dev/null +++ b/common/extension/health_checker_test.go @@ -0,0 +1,32 @@ +package extension + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +func TestGetHealthChecker(t *testing.T) { + SethealthChecker("mock", newMockhealthCheck) + checker := GetHealthChecker("mock", common.NewURLWithOptions()) + assert.NotNil(t, checker) +} + +type mockHealthChecker struct { +} + +func (m mockHealthChecker) IsHealthy(invoker protocol.Invoker) bool { + return true +} + +func newMockhealthCheck(url *common.URL) router.HealthChecker { + return &mockHealthChecker{} +} diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index 639fd559aa..fa044d71d1 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -153,7 +153,7 @@ func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) { } atomic.StoreInt32(&rpcStatus.successiveRequestFailureCount, 0) } else { - atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, time.Now().Unix()) + atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, CurrentTimeMillis()) atomic.AddInt32(&rpcStatus.successiveRequestFailureCount, 1) atomic.AddInt32(&rpcStatus.failed, 1) atomic.AddInt64(&rpcStatus.failedElapsed, elapsed)