From df90d9b2f3ab9a93950853bd71a9c2970e071caf Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Wed, 4 Mar 2020 18:19:18 +0800 Subject: [PATCH 01/10] add healthcheck router --- .../router/condition/default_health_check.go | 86 +++++++++++ .../condition/default_health_check_test.go | 135 ++++++++++++++++++ cluster/router/condition/factory.go | 11 ++ cluster/router/condition/factory_test.go | 5 + .../router/condition/health_check_route.go | 61 ++++++++ .../condition/health_check_route_test.go | 113 +++++++++++++++ cluster/router/health_checker.go | 9 ++ common/extension/health_checker.go | 21 +++ common/extension/health_checker_test.go | 32 +++++ protocol/rpc_status.go | 2 +- 10 files changed, 474 insertions(+), 1 deletion(-) create mode 100644 cluster/router/condition/default_health_check.go create mode 100644 cluster/router/condition/default_health_check_test.go create mode 100644 cluster/router/condition/health_check_route.go create mode 100644 cluster/router/condition/health_check_route_test.go create mode 100644 cluster/router/health_checker.go create mode 100644 common/extension/health_checker.go create mode 100644 common/extension/health_checker_test.go diff --git a/cluster/router/condition/default_health_check.go b/cluster/router/condition/default_health_check.go new file mode 100644 index 0000000000..f006d70f38 --- /dev/null +++ b/cluster/router/condition/default_health_check.go @@ -0,0 +1,86 @@ +package condition + +import ( + "math" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const ( + HEALTH_CHECKER = "health.checker" + DEFAULT_HEALTH_CHECKER = "default" + OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" + SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" + DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 + CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" + DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 + DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 + MAX_CIRCUIT_TRIPPED_TIMEOUT = 30000 +) + +func init() { + extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) +} + +// DefaultHealthChecker is the default +type DefaultHealthChecker struct { + OutStandingRequestConutLimit int32 + // the circuitbreaker threshold + RequestSuccessiveFailureThreshold int32 + CircuitTrippedTimeoutFactor int32 +} + +func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { + urlStatus := protocol.GetURLStatus(invoker.GetUrl()) + if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.OutStandingRequestConutLimit { + logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) + return false + } + return true +} +func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool { + circuitBreakerTimeout := c.getCircuitBreakerTimeout(status) + currentTime := protocol.CurrentTimeMillis() + if circuitBreakerTimeout <= 0 { + return false + } + return circuitBreakerTimeout > currentTime +} + +func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 { + sleepWindow := c.getCircuitBreakerSleepWindowTime(status) + if sleepWindow <= 0 { + return 0 + } + return status.GetLastRequestFailedTimestamp() + sleepWindow +} + +func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { + + successiveFailureCount := status.GetSuccessiveRequestFailureCount() + diff := successiveFailureCount - c.RequestSuccessiveFailureThreshold + if diff < 0 { + return 0 + } else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { + diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF + } + sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR + if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT { + sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT + } + return int64(sleepWindow) +} + +func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { + return &DefaultHealthChecker{ + OutStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), + RequestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), + CircuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), + } +} diff --git a/cluster/router/condition/default_health_check_test.go b/cluster/router/condition/default_health_check_test.go new file mode 100644 index 0000000000..f576a928af --- /dev/null +++ b/cluster/router/condition/default_health_check_test.go @@ -0,0 +1,135 @@ +package condition + +import ( + "math" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +func TestDefaultHealthChecker_IsHealthy(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + invoker := NewMockInvoker(url, 1) + healthy := hc.IsHealthy(invoker) + assert.True(t, healthy) + + url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") + // fake the outgoing request + for i := 0; i < 11; i++ { + request(url, "test", 0, true, false) + } + hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + healthy = hc.IsHealthy(invoker) + // the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy + assert.False(t, hc.IsHealthy(invoker)) + + // successive failed count is more than SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy + for i := 0; i < 11; i++ { + request(url, "test", 0, false, false) + } + url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + healthy = hc.IsHealthy(invoker) + assert.False(t, hc.IsHealthy(invoker)) + + // reset successive failed count and go to healthy + request(url, "test", 0, false, true) + healthy = hc.IsHealthy(invoker) + assert.True(t, hc.IsHealthy(invoker)) +} + +func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { + + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + // Increase the number of failed requests + for i := 0; i < 100; i++ { + request(url, "test", 1, false, false) + } + sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) + assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT) + + // Adjust the threshold size to 1000 + url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") + sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) + assert.True(t, sleepWindowTime == 0) + + url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) + assert.True(t, sleepWindowTime == 0) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) + assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT) + +} + +func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url)) + assert.True(t, timeout == 0) + url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + request(url1, "test", 1, false, false) + timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1)) + // timeout must after the current time + assert.True(t, timeout > protocol.CurrentTimeMillis()) + +} + +func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + status := protocol.GetURLStatus(url) + tripped := defaultHc.isCircuitBreakerTripped(status) + assert.False(t, tripped) + // Increase the number of failed requests + for i := 0; i < 100; i++ { + request(url, "test", 1, false, false) + } + tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url)) + assert.True(t, tripped) + +} + +func TestNewDefaultHealthChecker(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) + assert.NotNil(t, defaultHc) + assert.Equal(t, defaultHc.OutStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, defaultHc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) + + url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url1.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url1.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker) + assert.NotNil(t, nondefaultHc) + assert.Equal(t, nondefaultHc.OutStandingRequestConutLimit, int32(10)) + assert.Equal(t, nondefaultHc.RequestSuccessiveFailureThreshold, int32(10)) +} + +func request(url common.URL, method string, elapsed int64, active, succeeded bool) { + protocol.BeginCount(url, method) + if !active { + protocol.EndCount(url, method, elapsed, succeeded) + } +} diff --git a/cluster/router/condition/factory.go b/cluster/router/condition/factory.go index 66512a1387..0e3d40c2f4 100644 --- a/cluster/router/condition/factory.go +++ b/cluster/router/condition/factory.go @@ -57,3 +57,14 @@ func newAppRouterFactory() router.RouterFactory { func (c *AppRouterFactory) NewRouter(url *common.URL) (router.Router, error) { return NewAppRouter(url) } + +type HealthCheckRouteFactory struct { +} + +func newHealthCheckRouteFactory() router.RouterFactory { + return &HealthCheckRouteFactory{} +} + +func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) { + return NewHealthCheckRouter(url) +} diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index 99cec34096..97b4f7a408 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -366,3 +366,8 @@ func TestNewAppRouterFactory(t *testing.T) { factory := newAppRouterFactory() assert.NotNil(t, factory) } + +func TestHealthCheckRouteFactory(t *testing.T) { + factory := newHealthCheckRouteFactory() + assert.NotNil(t, factory) +} diff --git a/cluster/router/condition/health_check_route.go b/cluster/router/condition/health_check_route.go new file mode 100644 index 0000000000..6cdc9b6ebc --- /dev/null +++ b/cluster/router/condition/health_check_route.go @@ -0,0 +1,61 @@ +package condition + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +const ( + HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" +) + +type HealthCheckRouter struct { + url *common.URL + enabled bool + checker router.HealthChecker +} + +func NewHealthCheckRouter(url *common.URL) (router.Router, error) { + r := &HealthCheckRouter{} + r.url = url + r.enabled = url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false) + if r.enabled { + checkerName := url.GetParam(HEALTH_CHECKER, DEFAULT_HEALTH_CHECKER) + r.checker = extension.GetHealthChecker(checkerName, url) + } + return r, nil +} + +func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { + if !r.enabled { + return invokers + } + var healthyInvokers []protocol.Invoker + for _, invoker := range invokers { + if r.checker.IsHealthy(invoker) { + healthyInvokers = append(healthyInvokers, invoker) + } + } + if len(healthyInvokers) == 0 { + logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey()) + return invokers + } else { + return healthyInvokers + } +} + +func (r *HealthCheckRouter) Priority() int64 { + return 0 +} + +// URL Return URL in router +func (r *HealthCheckRouter) URL() common.URL { + return *r.url +} + +func (r *HealthCheckRouter) HealthyChecker() router.HealthChecker { + return r.checker +} diff --git a/cluster/router/condition/health_check_route_test.go b/cluster/router/condition/health_check_route_test.go new file mode 100644 index 0000000000..23a11ab1b5 --- /dev/null +++ b/cluster/router/condition/health_check_route_test.go @@ -0,0 +1,113 @@ +package condition + +import ( + "math" + "testing" + "time" +) +import ( + "github.com/stretchr/testify/assert" +) +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestHealthCheckRouter_Route(t *testing.T) { + consumerURL, _ := common.NewURL("dubbo://192.168.10.1/com.ikurento.user.UserProvider") + consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + url2, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider") + url3, _ := common.NewURL("dubbo://192.168.10.12:20000/com.ikurento.user.UserProvider") + hcr, _ := NewHealthCheckRouter(&consumerURL) + + var invokers []protocol.Invoker + invoker1 := NewMockInvoker(url1, 1) + invoker2 := NewMockInvoker(url2, 1) + invoker3 := NewMockInvoker(url3, 1) + invokers = append(invokers, invoker1, invoker2, invoker3) + inv := invocation.NewRPCInvocation("test", nil, nil) + res := hcr.Route(invokers, &consumerURL, inv) + // now all invokers are healthy + assert.True(t, len(res) == len(invokers)) + + for i := 0; i < 10; i++ { + request(url1, "test", 0, false, false) + } + res = hcr.Route(invokers, &consumerURL, inv) + // invokers1 is unhealthy now + assert.True(t, len(res) == 2 && !contains(res, invoker1)) + + for i := 0; i < 10; i++ { + request(url1, "test", 0, false, false) + request(url2, "test", 0, false, false) + } + + res = hcr.Route(invokers, &consumerURL, inv) + // only invokers3 is healthy now + assert.True(t, len(res) == 1 && !contains(res, invoker1) && !contains(res, invoker2)) + + for i := 0; i < 10; i++ { + request(url1, "test", 0, false, false) + request(url2, "test", 0, false, false) + request(url3, "test", 0, false, false) + } + + res = hcr.Route(invokers, &consumerURL, inv) + // now all invokers are unhealthy, so downgraded to all + assert.True(t, len(res) == 3) + + // reset the invoker1 successive failed count, so invoker1 go to healthy + request(url1, "test", 0, false, true) + res = hcr.Route(invokers, &consumerURL, inv) + assert.True(t, contains(res, invoker1)) + + for i := 0; i < 6; i++ { + request(url1, "test", 0, false, false) + } + // now all invokers are unhealthy, so downgraded to all again + res = hcr.Route(invokers, &consumerURL, inv) + assert.True(t, len(res) == 3) + time.Sleep(time.Second * 2) + // invoker1 go to healthy again after 2s + res = hcr.Route(invokers, &consumerURL, inv) + assert.True(t, contains(res, invoker1)) + +} + +func contains(invokers []protocol.Invoker, invoker protocol.Invoker) bool { + for _, e := range invokers { + if e == invoker { + return true + } + } + return false +} + +func TestNewHealthCheckRouter(t *testing.T) { + url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") + hcr, _ := NewHealthCheckRouter(&url) + h := hcr.(*HealthCheckRouter) + assert.Nil(t, h.checker) + + url.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") + hcr, _ = NewHealthCheckRouter(&url) + h = hcr.(*HealthCheckRouter) + assert.NotNil(t, h.checker) + + dhc := h.checker.(*DefaultHealthChecker) + assert.Equal(t, dhc.OutStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, dhc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) + assert.Equal(t, dhc.CircuitTrippedTimeoutFactor, int32(DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) + + url.SetParam(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") + url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + hcr, _ = NewHealthCheckRouter(&url) + h = hcr.(*HealthCheckRouter) + dhc = h.checker.(*DefaultHealthChecker) + assert.Equal(t, dhc.OutStandingRequestConutLimit, int32(1000)) + assert.Equal(t, dhc.RequestSuccessiveFailureThreshold, int32(10)) + assert.Equal(t, dhc.CircuitTrippedTimeoutFactor, int32(500)) +} diff --git a/cluster/router/health_checker.go b/cluster/router/health_checker.go new file mode 100644 index 0000000000..58a4cbff97 --- /dev/null +++ b/cluster/router/health_checker.go @@ -0,0 +1,9 @@ +package router + +import ( + "github.com/apache/dubbo-go/protocol" +) + +type HealthChecker interface { + IsHealthy(invoker protocol.Invoker) bool +} diff --git a/common/extension/health_checker.go b/common/extension/health_checker.go new file mode 100644 index 0000000000..67deb0cc37 --- /dev/null +++ b/common/extension/health_checker.go @@ -0,0 +1,21 @@ +package extension + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" +) + +var ( + healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker) +) + +func SethealthChecker(name string, fcn func(url *common.URL) router.HealthChecker) { + healthCheckers[name] = fcn +} + +func GetHealthChecker(name string, url *common.URL) router.HealthChecker { + if healthCheckers[name] == nil { + panic("healthCheckers for " + name + " is not existing, make sure you have import the package.") + } + return healthCheckers[name](url) +} diff --git a/common/extension/health_checker_test.go b/common/extension/health_checker_test.go new file mode 100644 index 0000000000..71547ce5b8 --- /dev/null +++ b/common/extension/health_checker_test.go @@ -0,0 +1,32 @@ +package extension + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +func TestGetHealthChecker(t *testing.T) { + SethealthChecker("mock", newMockhealthCheck) + checker := GetHealthChecker("mock", common.NewURLWithOptions()) + assert.NotNil(t, checker) +} + +type mockHealthChecker struct { +} + +func (m mockHealthChecker) IsHealthy(invoker protocol.Invoker) bool { + return true +} + +func newMockhealthCheck(url *common.URL) router.HealthChecker { + return &mockHealthChecker{} +} diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index 639fd559aa..fa044d71d1 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -153,7 +153,7 @@ func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) { } atomic.StoreInt32(&rpcStatus.successiveRequestFailureCount, 0) } else { - atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, time.Now().Unix()) + atomic.StoreInt64(&rpcStatus.lastRequestFailedTimestamp, CurrentTimeMillis()) atomic.AddInt32(&rpcStatus.successiveRequestFailureCount, 1) atomic.AddInt32(&rpcStatus.failed, 1) atomic.AddInt64(&rpcStatus.failedElapsed, elapsed) From 65ff41cedbb2505909c74f7b1c21f84280e5a569 Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Wed, 4 Mar 2020 20:37:32 +0800 Subject: [PATCH 02/10] add comments --- cluster/router/condition/default_health_check.go | 16 +++++++++++++--- cluster/router/condition/factory.go | 1 + cluster/router/condition/health_check_route.go | 7 +++++++ cluster/router/health_checker.go | 2 ++ common/constant/key.go | 1 + common/extension/health_checker.go | 2 ++ 6 files changed, 26 insertions(+), 3 deletions(-) diff --git a/cluster/router/condition/default_health_check.go b/cluster/router/condition/default_health_check.go index f006d70f38..ef4e2e53b1 100644 --- a/cluster/router/condition/default_health_check.go +++ b/cluster/router/condition/default_health_check.go @@ -28,14 +28,19 @@ func init() { extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) } -// DefaultHealthChecker is the default +// DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of +// the invoker based on the number of successive bad request and the current active request. type DefaultHealthChecker struct { + // OutStandingRequestConutLimit OutStandingRequestConutLimit int32 - // the circuitbreaker threshold + // RequestSuccessiveFailureThreshold RequestSuccessiveFailureThreshold int32 - CircuitTrippedTimeoutFactor int32 + // RequestSuccessiveFailureThreshold + CircuitTrippedTimeoutFactor int32 } +// IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request +// and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { urlStatus := protocol.GetURLStatus(invoker.GetUrl()) if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.OutStandingRequestConutLimit { @@ -44,6 +49,8 @@ func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { } return true } + +// isCircuitBreakerTripped determine whether the invoker is in the tripped state by the number of successive bad request func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool { circuitBreakerTimeout := c.getCircuitBreakerTimeout(status) currentTime := protocol.CurrentTimeMillis() @@ -53,6 +60,7 @@ func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatu return circuitBreakerTimeout > currentTime } +// getCircuitBreakerTimeout get the timestamp recovered from tripped state func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 { sleepWindow := c.getCircuitBreakerSleepWindowTime(status) if sleepWindow <= 0 { @@ -61,6 +69,7 @@ func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStat return status.GetLastRequestFailedTimestamp() + sleepWindow } +// getCircuitBreakerSleepWindowTime get the sleep window time of invoker func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { successiveFailureCount := status.GetSuccessiveRequestFailureCount() @@ -77,6 +86,7 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol return int64(sleepWindow) } +// NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { return &DefaultHealthChecker{ OutStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), diff --git a/cluster/router/condition/factory.go b/cluster/router/condition/factory.go index 0e3d40c2f4..084903d341 100644 --- a/cluster/router/condition/factory.go +++ b/cluster/router/condition/factory.go @@ -27,6 +27,7 @@ import ( func init() { extension.SetRouterFactory(constant.ConditionRouterName, newConditionRouterFactory) extension.SetRouterFactory(constant.ConditionAppRouterName, newAppRouterFactory) + extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory) } // ConditionRouterFactory Condition router factory diff --git a/cluster/router/condition/health_check_route.go b/cluster/router/condition/health_check_route.go index 6cdc9b6ebc..b40d22ddb5 100644 --- a/cluster/router/condition/health_check_route.go +++ b/cluster/router/condition/health_check_route.go @@ -12,12 +12,14 @@ const ( HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled" ) +// HealthCheckRouter provides a health-first routing mechanism through HealthChecker type HealthCheckRouter struct { url *common.URL enabled bool checker router.HealthChecker } +// NewHealthCheckRouter construct an HealthCheckRouter via url func NewHealthCheckRouter(url *common.URL) (router.Router, error) { r := &HealthCheckRouter{} r.url = url @@ -29,16 +31,19 @@ func NewHealthCheckRouter(url *common.URL) (router.Router, error) { return r, nil } +// Route gets a list of healthy invoker func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker { if !r.enabled { return invokers } var healthyInvokers []protocol.Invoker + // Add healthy invoker to the list for _, invoker := range invokers { if r.checker.IsHealthy(invoker) { healthyInvokers = append(healthyInvokers, invoker) } } + // If all Invoke are considered unhealthy, downgrade to all inovker if len(healthyInvokers) == 0 { logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey()) return invokers @@ -47,6 +52,7 @@ func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, } } +// Priority func (r *HealthCheckRouter) Priority() int64 { return 0 } @@ -56,6 +62,7 @@ func (r *HealthCheckRouter) URL() common.URL { return *r.url } +// HealthyChecker returns the HealthChecker bound to this HealthCheckRouter func (r *HealthCheckRouter) HealthyChecker() router.HealthChecker { return r.checker } diff --git a/cluster/router/health_checker.go b/cluster/router/health_checker.go index 58a4cbff97..44e1f7ace6 100644 --- a/cluster/router/health_checker.go +++ b/cluster/router/health_checker.go @@ -4,6 +4,8 @@ import ( "github.com/apache/dubbo-go/protocol" ) +// HealthChecker is used to determine whether the invoker is healthy or not type HealthChecker interface { + // IsHealthy evaluates the healthy state on the given Invoker IsHealthy(invoker protocol.Invoker) bool } diff --git a/common/constant/key.go b/common/constant/key.go index 4536d945c3..fc9c7af366 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -159,6 +159,7 @@ const ( ConditionAppRouterName = "app" // ListenableRouterName Specify listenable router name ListenableRouterName = "listenable" + HealthCheckRouterName = "health_check" // ConditionRouterRuleSuffix Specify condition router suffix ConditionRouterRuleSuffix = ".condition-router" diff --git a/common/extension/health_checker.go b/common/extension/health_checker.go index 67deb0cc37..62ad1d47ff 100644 --- a/common/extension/health_checker.go +++ b/common/extension/health_checker.go @@ -9,10 +9,12 @@ var ( healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker) ) +// SethealthChecker set the HealthChecker with name func SethealthChecker(name string, fcn func(url *common.URL) router.HealthChecker) { healthCheckers[name] = fcn } +// GetHealthChecker get the HealthChecker with name func GetHealthChecker(name string, url *common.URL) router.HealthChecker { if healthCheckers[name] == nil { panic("healthCheckers for " + name + " is not existing, make sure you have import the package.") From 132ee77b67fe922fb69efb4bd0592ee0958413c9 Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Wed, 4 Mar 2020 20:47:39 +0800 Subject: [PATCH 03/10] fix ut --- .../condition/default_health_check_test.go | 7 +++-- .../condition/health_check_route_test.go | 2 ++ protocol/rpc_status.go | 14 ++++++++++ protocol/rpc_status_test.go | 27 +++++-------------- 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/cluster/router/condition/default_health_check_test.go b/cluster/router/condition/default_health_check_test.go index f576a928af..0a7f243921 100644 --- a/cluster/router/condition/default_health_check_test.go +++ b/cluster/router/condition/default_health_check_test.go @@ -15,6 +15,7 @@ import ( ) func TestDefaultHealthChecker_IsHealthy(t *testing.T) { + defer protocol.CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) invoker := NewMockInvoker(url, 1) @@ -49,7 +50,7 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { } func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { - + defer protocol.CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) // Increase the number of failed requests @@ -75,10 +76,10 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { request(url1, "test", 1, false, false) sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT) - } func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { + defer protocol.CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url)) @@ -97,6 +98,7 @@ func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { } func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) { + defer protocol.CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) status := protocol.GetURLStatus(url) @@ -112,6 +114,7 @@ func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) { } func TestNewDefaultHealthChecker(t *testing.T) { + defer protocol.CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) assert.NotNil(t, defaultHc) diff --git a/cluster/router/condition/health_check_route_test.go b/cluster/router/condition/health_check_route_test.go index 23a11ab1b5..d345f19949 100644 --- a/cluster/router/condition/health_check_route_test.go +++ b/cluster/router/condition/health_check_route_test.go @@ -15,6 +15,7 @@ import ( ) func TestHealthCheckRouter_Route(t *testing.T) { + defer protocol.CleanAllStatus() consumerURL, _ := common.NewURL("dubbo://192.168.10.1/com.ikurento.user.UserProvider") consumerURL.SetParam(HEALTH_ROUTE_ENABLED_KEY, "true") url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") @@ -86,6 +87,7 @@ func contains(invokers []protocol.Invoker, invoker protocol.Invoker) bool { } func TestNewHealthCheckRouter(t *testing.T) { + defer protocol.CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") hcr, _ := NewHealthCheckRouter(&url) h := hcr.(*HealthCheckRouter) diff --git a/protocol/rpc_status.go b/protocol/rpc_status.go index fa044d71d1..13be47c98e 100644 --- a/protocol/rpc_status.go +++ b/protocol/rpc_status.go @@ -167,3 +167,17 @@ func endCount0(rpcStatus *RPCStatus, elapsed int64, succeeded bool) { func CurrentTimeMillis() int64 { return time.Now().UnixNano() / int64(time.Millisecond) } + +// Destroy is used to clean all status +func CleanAllStatus() { + delete1 := func(key interface{}, value interface{}) bool { + methodStatistics.Delete(key) + return true + } + methodStatistics.Range(delete1) + delete2 := func(key interface{}, value interface{}) bool { + serviceStatistic.Delete(key) + return true + } + serviceStatistic.Range(delete2) +} diff --git a/protocol/rpc_status_test.go b/protocol/rpc_status_test.go index ffdb3b5356..5a07f44eab 100644 --- a/protocol/rpc_status_test.go +++ b/protocol/rpc_status_test.go @@ -14,7 +14,7 @@ import ( ) func TestBeginCount(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") BeginCount(url, "test") @@ -28,7 +28,7 @@ func TestBeginCount(t *testing.T) { } func TestEndCount(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") EndCount(url, "test", 100, true) @@ -41,7 +41,7 @@ func TestEndCount(t *testing.T) { } func TestGetMethodStatus(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") status := GetMethodStatus(url, "test") @@ -50,7 +50,7 @@ func TestGetMethodStatus(t *testing.T) { } func TestGetUrlStatus(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") status := GetURLStatus(url) @@ -59,7 +59,7 @@ func TestGetUrlStatus(t *testing.T) { } func Test_beginCount0(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") status := GetURLStatus(url) @@ -68,7 +68,7 @@ func Test_beginCount0(t *testing.T) { } func Test_All(t *testing.T) { - defer destroy() + defer CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") request(url, "test", 100, false, true) @@ -129,23 +129,10 @@ func request(url common.URL, method string, elapsed int64, active, succeeded boo } func TestCurrentTimeMillis(t *testing.T) { - defer destroy() + defer CleanAllStatus() c := CurrentTimeMillis() assert.NotNil(t, c) str := strconv.FormatInt(c, 10) i, _ := strconv.ParseInt(str, 10, 64) assert.Equal(t, c, i) } - -func destroy() { - delete1 := func(key interface{}, value interface{}) bool { - methodStatistics.Delete(key) - return true - } - methodStatistics.Range(delete1) - delete2 := func(key interface{}, value interface{}) bool { - serviceStatistic.Delete(key) - return true - } - serviceStatistic.Range(delete2) -} From 7368392a5780a2eda7c3a1e96a0f61c42ca10889 Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Wed, 4 Mar 2020 20:59:59 +0800 Subject: [PATCH 04/10] add apache license --- .../router/condition/default_health_check.go | 17 +++++++++++++++++ .../condition/default_health_check_test.go | 17 +++++++++++++++++ .../router/condition/health_check_route.go | 17 +++++++++++++++++ .../condition/health_check_route_test.go | 19 +++++++++++++++++++ cluster/router/health_checker.go | 17 +++++++++++++++++ common/extension/health_checker.go | 17 +++++++++++++++++ common/extension/health_checker_test.go | 17 +++++++++++++++++ 7 files changed, 121 insertions(+) diff --git a/cluster/router/condition/default_health_check.go b/cluster/router/condition/default_health_check.go index ef4e2e53b1..70542feb49 100644 --- a/cluster/router/condition/default_health_check.go +++ b/cluster/router/condition/default_health_check.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package condition import ( diff --git a/cluster/router/condition/default_health_check_test.go b/cluster/router/condition/default_health_check_test.go index 0a7f243921..730e787ce4 100644 --- a/cluster/router/condition/default_health_check_test.go +++ b/cluster/router/condition/default_health_check_test.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package condition import ( diff --git a/cluster/router/condition/health_check_route.go b/cluster/router/condition/health_check_route.go index b40d22ddb5..e25baf3b5d 100644 --- a/cluster/router/condition/health_check_route.go +++ b/cluster/router/condition/health_check_route.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package condition import ( diff --git a/cluster/router/condition/health_check_route_test.go b/cluster/router/condition/health_check_route_test.go index d345f19949..e2fe0856f9 100644 --- a/cluster/router/condition/health_check_route_test.go +++ b/cluster/router/condition/health_check_route_test.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package condition import ( @@ -5,9 +22,11 @@ import ( "testing" "time" ) + import ( "github.com/stretchr/testify/assert" ) + import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" diff --git a/cluster/router/health_checker.go b/cluster/router/health_checker.go index 44e1f7ace6..d9e3087a27 100644 --- a/cluster/router/health_checker.go +++ b/cluster/router/health_checker.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package router import ( diff --git a/common/extension/health_checker.go b/common/extension/health_checker.go index 62ad1d47ff..365c5d0910 100644 --- a/common/extension/health_checker.go +++ b/common/extension/health_checker.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package extension import ( diff --git a/common/extension/health_checker_test.go b/common/extension/health_checker_test.go index 71547ce5b8..ec934e6e9c 100644 --- a/common/extension/health_checker_test.go +++ b/common/extension/health_checker_test.go @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package extension import ( From 3d7d50f7358bf4787c497ed91c0365cdc7a72690 Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Wed, 4 Mar 2020 21:32:58 +0800 Subject: [PATCH 05/10] fix fmt failure --- common/constant/key.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/constant/key.go b/common/constant/key.go index fc9c7af366..3ba8eed8df 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -158,7 +158,7 @@ const ( // ConditionAppRouterName Specify listenable application router name ConditionAppRouterName = "app" // ListenableRouterName Specify listenable router name - ListenableRouterName = "listenable" + ListenableRouterName = "listenable" HealthCheckRouterName = "health_check" // ConditionRouterRuleSuffix Specify condition router suffix From 0fc5ceb1ff3cb6cffecc079f838a1f268f58289a Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Thu, 5 Mar 2020 09:50:36 +0800 Subject: [PATCH 06/10] move healthcheck router to healthcheck dir --- cluster/router/condition/factory.go | 14 +--- cluster/router/condition/factory_test.go | 7 +- .../default_health_check.go | 2 +- .../default_health_check_test.go | 3 +- cluster/router/healthcheck/factory.go | 44 ++++++++++++ cluster/router/healthcheck/factory_test.go | 67 +++++++++++++++++++ .../health_check_route.go | 2 +- .../health_check_route_test.go | 2 +- 8 files changed, 118 insertions(+), 23 deletions(-) rename cluster/router/{condition => healthcheck}/default_health_check.go (99%) rename cluster/router/{condition => healthcheck}/default_health_check_test.go (99%) create mode 100644 cluster/router/healthcheck/factory.go create mode 100644 cluster/router/healthcheck/factory_test.go rename cluster/router/{condition => healthcheck}/health_check_route.go (99%) rename cluster/router/{condition => healthcheck}/health_check_route_test.go (99%) diff --git a/cluster/router/condition/factory.go b/cluster/router/condition/factory.go index 084903d341..68534aee44 100644 --- a/cluster/router/condition/factory.go +++ b/cluster/router/condition/factory.go @@ -27,7 +27,6 @@ import ( func init() { extension.SetRouterFactory(constant.ConditionRouterName, newConditionRouterFactory) extension.SetRouterFactory(constant.ConditionAppRouterName, newAppRouterFactory) - extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory) } // ConditionRouterFactory Condition router factory @@ -57,15 +56,4 @@ func newAppRouterFactory() router.RouterFactory { // NewRouter Create AppRouterFactory by URL func (c *AppRouterFactory) NewRouter(url *common.URL) (router.Router, error) { return NewAppRouter(url) -} - -type HealthCheckRouteFactory struct { -} - -func newHealthCheckRouteFactory() router.RouterFactory { - return &HealthCheckRouteFactory{} -} - -func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) { - return NewHealthCheckRouter(url) -} +} \ No newline at end of file diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index 97b4f7a408..54afcd4241 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -365,9 +365,4 @@ func TestNewConditionRouterFactory(t *testing.T) { func TestNewAppRouterFactory(t *testing.T) { factory := newAppRouterFactory() assert.NotNil(t, factory) -} - -func TestHealthCheckRouteFactory(t *testing.T) { - factory := newHealthCheckRouteFactory() - assert.NotNil(t, factory) -} +} \ No newline at end of file diff --git a/cluster/router/condition/default_health_check.go b/cluster/router/healthcheck/default_health_check.go similarity index 99% rename from cluster/router/condition/default_health_check.go rename to cluster/router/healthcheck/default_health_check.go index 70542feb49..44d5f0e95c 100644 --- a/cluster/router/condition/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package condition +package healthcheck import ( "math" diff --git a/cluster/router/condition/default_health_check_test.go b/cluster/router/healthcheck/default_health_check_test.go similarity index 99% rename from cluster/router/condition/default_health_check_test.go rename to cluster/router/healthcheck/default_health_check_test.go index 730e787ce4..5ab2bfb47d 100644 --- a/cluster/router/condition/default_health_check_test.go +++ b/cluster/router/healthcheck/default_health_check_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package condition +package healthcheck import ( "math" @@ -32,6 +32,7 @@ import ( ) func TestDefaultHealthChecker_IsHealthy(t *testing.T) { + defer protocol.CleanAllStatus() url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) diff --git a/cluster/router/healthcheck/factory.go b/cluster/router/healthcheck/factory.go new file mode 100644 index 0000000000..337013c9a4 --- /dev/null +++ b/cluster/router/healthcheck/factory.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "github.com/apache/dubbo-go/cluster/router" + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" +) + +func init() { + extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory) +} + + +// HealthCheckRouteFactory +type HealthCheckRouteFactory struct { +} + +// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory +func newHealthCheckRouteFactory() router.RouterFactory { + return &HealthCheckRouteFactory{} +} + +// NewRouter construct a new NewHealthCheckRouter via url +func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) { + return NewHealthCheckRouter(url) +} diff --git a/cluster/router/healthcheck/factory_test.go b/cluster/router/healthcheck/factory_test.go new file mode 100644 index 0000000000..824a442715 --- /dev/null +++ b/cluster/router/healthcheck/factory_test.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package healthcheck + +import ( + "context" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) +type MockInvoker struct { + url common.URL +} + +func NewMockInvoker(url common.URL, successCount int) *MockInvoker { + return &MockInvoker{ + url: url, + } +} + +func (bi *MockInvoker) GetUrl() common.URL { + return bi.url +} +func (bi *MockInvoker) IsAvailable() bool { + return true +} + +func (bi *MockInvoker) IsDestroyed() bool { + return true +} + + + +func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { + return nil +} + +func (bi *MockInvoker) Destroy() { +} + + +func TestHealthCheckRouteFactory(t *testing.T) { + factory := newHealthCheckRouteFactory() + assert.NotNil(t, factory) +} diff --git a/cluster/router/condition/health_check_route.go b/cluster/router/healthcheck/health_check_route.go similarity index 99% rename from cluster/router/condition/health_check_route.go rename to cluster/router/healthcheck/health_check_route.go index e25baf3b5d..7e66f9d0c2 100644 --- a/cluster/router/condition/health_check_route.go +++ b/cluster/router/healthcheck/health_check_route.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package condition +package healthcheck import ( "github.com/apache/dubbo-go/cluster/router" diff --git a/cluster/router/condition/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go similarity index 99% rename from cluster/router/condition/health_check_route_test.go rename to cluster/router/healthcheck/health_check_route_test.go index e2fe0856f9..6ad5ce6b92 100644 --- a/cluster/router/condition/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -15,7 +15,7 @@ * limitations under the License. */ -package condition +package healthcheck import ( "math" From 9ae15f11b3d2172af77f8260f88fcb4ed598861b Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Thu, 5 Mar 2020 16:32:19 +0800 Subject: [PATCH 07/10] fix comment --- .../healthcheck/default_health_check.go | 34 ++++++++++--------- .../healthcheck/default_health_check_test.go | 12 +++---- cluster/router/healthcheck/factory.go | 1 - cluster/router/healthcheck/factory_test.go | 8 ++--- .../healthcheck/health_check_route_test.go | 12 +++---- common/constant/key.go | 1 + 6 files changed, 34 insertions(+), 34 deletions(-) diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index 44d5f0e95c..e580dc1040 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -38,7 +38,7 @@ const ( CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 - MAX_CIRCUIT_TRIPPED_TIMEOUT = 30000 + MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 ) func init() { @@ -48,19 +48,19 @@ func init() { // DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of // the invoker based on the number of successive bad request and the current active request. type DefaultHealthChecker struct { - // OutStandingRequestConutLimit - OutStandingRequestConutLimit int32 - // RequestSuccessiveFailureThreshold - RequestSuccessiveFailureThreshold int32 - // RequestSuccessiveFailureThreshold - CircuitTrippedTimeoutFactor int32 + // outStandingRequestConutLimit + outStandingRequestConutLimit int32 + // requestSuccessiveFailureThreshold + requestSuccessiveFailureThreshold int32 + // requestSuccessiveFailureThreshold + circuitTrippedTimeoutFactor int32 } // IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request // and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { urlStatus := protocol.GetURLStatus(invoker.GetUrl()) - if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.OutStandingRequestConutLimit { + if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.outStandingRequestConutLimit { logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) return false } @@ -77,7 +77,7 @@ func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatu return circuitBreakerTimeout > currentTime } -// getCircuitBreakerTimeout get the timestamp recovered from tripped state +// getCircuitBreakerTimeout get the timestamp recovered from tripped state, the unit is millisecond func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 { sleepWindow := c.getCircuitBreakerSleepWindowTime(status) if sleepWindow <= 0 { @@ -86,19 +86,19 @@ func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStat return status.GetLastRequestFailedTimestamp() + sleepWindow } -// getCircuitBreakerSleepWindowTime get the sleep window time of invoker +// getCircuitBreakerSleepWindowTime get the sleep window time of invoker, the unit is millisecond func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { successiveFailureCount := status.GetSuccessiveRequestFailureCount() - diff := successiveFailureCount - c.RequestSuccessiveFailureThreshold + diff := successiveFailureCount - c.requestSuccessiveFailureThreshold if diff < 0 { return 0 } else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF } sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR - if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT { - sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT + if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { + sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS } return int64(sleepWindow) } @@ -106,8 +106,10 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol // NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { return &DefaultHealthChecker{ - OutStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), - RequestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), - CircuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), + outStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), + requestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), + circuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), } } + + diff --git a/cluster/router/healthcheck/default_health_check_test.go b/cluster/router/healthcheck/default_health_check_test.go index 5ab2bfb47d..39408c8a4f 100644 --- a/cluster/router/healthcheck/default_health_check_test.go +++ b/cluster/router/healthcheck/default_health_check_test.go @@ -76,7 +76,7 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { request(url, "test", 1, false, false) } sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) - assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT) + assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) // Adjust the threshold size to 1000 url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") @@ -93,7 +93,7 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { request(url1, "test", 1, false, false) request(url1, "test", 1, false, false) sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) - assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT) + assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) } func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { @@ -136,16 +136,16 @@ func TestNewDefaultHealthChecker(t *testing.T) { url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) assert.NotNil(t, defaultHc) - assert.Equal(t, defaultHc.OutStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, defaultHc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) + assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") url1.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") url1.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker) assert.NotNil(t, nondefaultHc) - assert.Equal(t, nondefaultHc.OutStandingRequestConutLimit, int32(10)) - assert.Equal(t, nondefaultHc.RequestSuccessiveFailureThreshold, int32(10)) + assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10)) + assert.Equal(t, nondefaultHc.requestSuccessiveFailureThreshold, int32(10)) } func request(url common.URL, method string, elapsed int64, active, succeeded bool) { diff --git a/cluster/router/healthcheck/factory.go b/cluster/router/healthcheck/factory.go index 337013c9a4..32d84d145c 100644 --- a/cluster/router/healthcheck/factory.go +++ b/cluster/router/healthcheck/factory.go @@ -28,7 +28,6 @@ func init() { extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory) } - // HealthCheckRouteFactory type HealthCheckRouteFactory struct { } diff --git a/cluster/router/healthcheck/factory_test.go b/cluster/router/healthcheck/factory_test.go index 824a442715..a9d94da7c3 100644 --- a/cluster/router/healthcheck/factory_test.go +++ b/cluster/router/healthcheck/factory_test.go @@ -30,13 +30,14 @@ import ( "github.com/apache/dubbo-go/common" "github.com/apache/dubbo-go/protocol" ) + type MockInvoker struct { - url common.URL + url common.URL } func NewMockInvoker(url common.URL, successCount int) *MockInvoker { return &MockInvoker{ - url: url, + url: url, } } @@ -51,8 +52,6 @@ func (bi *MockInvoker) IsDestroyed() bool { return true } - - func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol.Result { return nil } @@ -60,7 +59,6 @@ func (bi *MockInvoker) Invoke(_ context.Context, _ protocol.Invocation) protocol func (bi *MockInvoker) Destroy() { } - func TestHealthCheckRouteFactory(t *testing.T) { factory := newHealthCheckRouteFactory() assert.NotNil(t, factory) diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index 6ad5ce6b92..c48fdc7d6a 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -118,9 +118,9 @@ func TestNewHealthCheckRouter(t *testing.T) { assert.NotNil(t, h.checker) dhc := h.checker.(*DefaultHealthChecker) - assert.Equal(t, dhc.OutStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, dhc.RequestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) - assert.Equal(t, dhc.CircuitTrippedTimeoutFactor, int32(DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) + assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) url.SetParam(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") @@ -128,7 +128,7 @@ func TestNewHealthCheckRouter(t *testing.T) { hcr, _ = NewHealthCheckRouter(&url) h = hcr.(*HealthCheckRouter) dhc = h.checker.(*DefaultHealthChecker) - assert.Equal(t, dhc.OutStandingRequestConutLimit, int32(1000)) - assert.Equal(t, dhc.RequestSuccessiveFailureThreshold, int32(10)) - assert.Equal(t, dhc.CircuitTrippedTimeoutFactor, int32(500)) + assert.Equal(t, dhc.outStandingRequestConutLimit, int32(1000)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(10)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(500)) } diff --git a/common/constant/key.go b/common/constant/key.go index 3ba8eed8df..648dc48d8f 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -159,6 +159,7 @@ const ( ConditionAppRouterName = "app" // ListenableRouterName Specify listenable router name ListenableRouterName = "listenable" + // HealthCheckRouterName Specify the name of HealthCheckRouter HealthCheckRouterName = "health_check" // ConditionRouterRuleSuffix Specify condition router suffix From 44b6f9f518eb7f1175905ec003d8dcb2b4ae70e0 Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Thu, 5 Mar 2020 17:31:10 +0800 Subject: [PATCH 08/10] fix comment --- cluster/router/condition/factory.go | 2 +- cluster/router/condition/factory_test.go | 2 +- cluster/router/healthcheck/default_health_check.go | 2 -- common/constant/key.go | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/cluster/router/condition/factory.go b/cluster/router/condition/factory.go index 68534aee44..66512a1387 100644 --- a/cluster/router/condition/factory.go +++ b/cluster/router/condition/factory.go @@ -56,4 +56,4 @@ func newAppRouterFactory() router.RouterFactory { // NewRouter Create AppRouterFactory by URL func (c *AppRouterFactory) NewRouter(url *common.URL) (router.Router, error) { return NewAppRouter(url) -} \ No newline at end of file +} diff --git a/cluster/router/condition/factory_test.go b/cluster/router/condition/factory_test.go index 54afcd4241..99cec34096 100644 --- a/cluster/router/condition/factory_test.go +++ b/cluster/router/condition/factory_test.go @@ -365,4 +365,4 @@ func TestNewConditionRouterFactory(t *testing.T) { func TestNewAppRouterFactory(t *testing.T) { factory := newAppRouterFactory() assert.NotNil(t, factory) -} \ No newline at end of file +} diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index e580dc1040..5be1027603 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -111,5 +111,3 @@ func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { circuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), } } - - diff --git a/common/constant/key.go b/common/constant/key.go index 648dc48d8f..5690e53ed9 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -158,7 +158,7 @@ const ( // ConditionAppRouterName Specify listenable application router name ConditionAppRouterName = "app" // ListenableRouterName Specify listenable router name - ListenableRouterName = "listenable" + ListenableRouterName = "listenable" // HealthCheckRouterName Specify the name of HealthCheckRouter HealthCheckRouterName = "health_check" From 9674573164c6dcf0699653121a01d69d23d1d01f Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Thu, 5 Mar 2020 23:55:54 +0800 Subject: [PATCH 09/10] mv constant val to constant package --- .../healthcheck/default_health_check.go | 50 ++++++++++--------- .../healthcheck/default_health_check_test.go | 23 +++++---- .../router/healthcheck/health_check_route.go | 3 +- .../healthcheck/health_check_route_test.go | 11 ++-- common/constant/key.go | 13 +++++ 5 files changed, 60 insertions(+), 40 deletions(-) diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index 5be1027603..fc9879798e 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -24,25 +24,14 @@ import ( import ( "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" ) -const ( - HEALTH_CHECKER = "health.checker" - DEFAULT_HEALTH_CHECKER = "default" - OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" - SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" - DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 - CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" - DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 - DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 - MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 -) - func init() { - extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) + extension.SethealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker) } // DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of @@ -60,7 +49,7 @@ type DefaultHealthChecker struct { // and the current active request func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool { urlStatus := protocol.GetURLStatus(invoker.GetUrl()) - if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.outStandingRequestConutLimit { + if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestConutLimit() { logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key()) return false } @@ -90,24 +79,39 @@ func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStat func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 { successiveFailureCount := status.GetSuccessiveRequestFailureCount() - diff := successiveFailureCount - c.requestSuccessiveFailureThreshold + diff := successiveFailureCount - c.GetRequestSuccessiveFailureThreshold() if diff < 0 { return 0 - } else if diff > DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { - diff = DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF + } else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF { + diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF } - sleepWindow := (1 << diff) * DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR - if sleepWindow > MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { - sleepWindow = MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS + sleepWindow := (1 << diff) * c.GetCircuitTrippedTimeoutFactor() + if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS { + sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS } return int64(sleepWindow) } +// GetOutStandingRequestConutLimit return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker +func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 { + return c.requestSuccessiveFailureThreshold +} + +// GetOutStandingRequestConutLimit return the circuitTrippedTimeoutFactor bound to this DefaultHealthChecker +func (c *DefaultHealthChecker) GetCircuitTrippedTimeoutFactor() int32 { + return c.circuitTrippedTimeoutFactor +} + +// GetOutStandingRequestConutLimit return the outStandingRequestConutLimit bound to this DefaultHealthChecker +func (c *DefaultHealthChecker) GetOutStandingRequestConutLimit() int32 { + return c.outStandingRequestConutLimit +} + // NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url func NewDefaultHealthChecker(url *common.URL) router.HealthChecker { return &DefaultHealthChecker{ - outStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), - requestSuccessiveFailureThreshold: int32(url.GetParamInt(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), - circuitTrippedTimeoutFactor: int32(url.GetParamInt(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), + outStandingRequestConutLimit: int32(url.GetParamInt(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)), + requestSuccessiveFailureThreshold: int32(url.GetParamInt(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)), + circuitTrippedTimeoutFactor: int32(url.GetParamInt(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)), } } diff --git a/cluster/router/healthcheck/default_health_check_test.go b/cluster/router/healthcheck/default_health_check_test.go index 39408c8a4f..74aa394074 100644 --- a/cluster/router/healthcheck/default_health_check_test.go +++ b/cluster/router/healthcheck/default_health_check_test.go @@ -28,6 +28,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" ) @@ -40,8 +41,8 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { healthy := hc.IsHealthy(invoker) assert.True(t, healthy) - url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") - url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100") // fake the outgoing request for i := 0; i < 11; i++ { request(url, "test", 0, true, false) @@ -51,12 +52,12 @@ func TestDefaultHealthChecker_IsHealthy(t *testing.T) { // the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy assert.False(t, hc.IsHealthy(invoker)) - // successive failed count is more than SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy + // successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy for i := 0; i < 11; i++ { request(url, "test", 0, false, false) } - url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") - url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker) healthy = hc.IsHealthy(invoker) assert.False(t, hc.IsHealthy(invoker)) @@ -76,10 +77,10 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { request(url, "test", 1, false, false) } sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) - assert.True(t, sleepWindowTime == MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) + assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) // Adjust the threshold size to 1000 - url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000") sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url)) assert.True(t, sleepWindowTime == 0) @@ -93,7 +94,7 @@ func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) { request(url1, "test", 1, false, false) request(url1, "test", 1, false, false) sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1)) - assert.True(t, sleepWindowTime > 0 && sleepWindowTime < MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) + assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS) } func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) { @@ -137,11 +138,11 @@ func TestNewDefaultHealthChecker(t *testing.T) { defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker) assert.NotNil(t, defaultHc) assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) + assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)) url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider") - url1.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") - url1.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10") + url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker) assert.NotNil(t, nondefaultHc) assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10)) diff --git a/cluster/router/healthcheck/health_check_route.go b/cluster/router/healthcheck/health_check_route.go index 7e66f9d0c2..481a85acb0 100644 --- a/cluster/router/healthcheck/health_check_route.go +++ b/cluster/router/healthcheck/health_check_route.go @@ -20,6 +20,7 @@ package healthcheck import ( "github.com/apache/dubbo-go/cluster/router" "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/common/extension" "github.com/apache/dubbo-go/common/logger" "github.com/apache/dubbo-go/protocol" @@ -42,7 +43,7 @@ func NewHealthCheckRouter(url *common.URL) (router.Router, error) { r.url = url r.enabled = url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false) if r.enabled { - checkerName := url.GetParam(HEALTH_CHECKER, DEFAULT_HEALTH_CHECKER) + checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) r.checker = extension.GetHealthChecker(checkerName, url) } return r, nil diff --git a/cluster/router/healthcheck/health_check_route_test.go b/cluster/router/healthcheck/health_check_route_test.go index c48fdc7d6a..759ef93dbe 100644 --- a/cluster/router/healthcheck/health_check_route_test.go +++ b/cluster/router/healthcheck/health_check_route_test.go @@ -29,6 +29,7 @@ import ( import ( "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" "github.com/apache/dubbo-go/protocol" "github.com/apache/dubbo-go/protocol/invocation" ) @@ -119,12 +120,12 @@ func TestNewHealthCheckRouter(t *testing.T) { dhc := h.checker.(*DefaultHealthChecker) assert.Equal(t, dhc.outStandingRequestConutLimit, int32(math.MaxInt32)) - assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) - assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) + assert.Equal(t, dhc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_THRESHOLD)) + assert.Equal(t, dhc.circuitTrippedTimeoutFactor, int32(constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)) - url.SetParam(CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") - url.SetParam(SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") - url.SetParam(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") + url.SetParam(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, "500") + url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10") + url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000") hcr, _ = NewHealthCheckRouter(&url) h = hcr.(*HealthCheckRouter) dhc = h.checker.(*DefaultHealthChecker) diff --git a/common/constant/key.go b/common/constant/key.go index 5690e53ed9..0ecd4f752e 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -190,3 +190,16 @@ const ( ACCESS_KEY_ID_KEY = "accessKeyId" SECRET_ACCESS_KEY_KEY = "secretAccessKey" ) + +// HealthCheck Router +const ( + HEALTH_CHECKER = "health.checker" + DEFAULT_HEALTH_CHECKER = "default" + OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" + SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" + DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 + CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" + DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 + DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 + MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 +) From 91830f5ab5ce42f21e405c149418a58421052afc Mon Sep 17 00:00:00 2001 From: CodingSinger Date: Fri, 6 Mar 2020 21:06:49 +0800 Subject: [PATCH 10/10] add some comments --- .../healthcheck/default_health_check.go | 6 ++--- .../router/healthcheck/health_check_route.go | 12 ++++----- common/constant/key.go | 25 +++++++++++++------ 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/cluster/router/healthcheck/default_health_check.go b/cluster/router/healthcheck/default_health_check.go index fc9879798e..a26f86ddac 100644 --- a/cluster/router/healthcheck/default_health_check.go +++ b/cluster/router/healthcheck/default_health_check.go @@ -37,11 +37,11 @@ func init() { // DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of // the invoker based on the number of successive bad request and the current active request. type DefaultHealthChecker struct { - // outStandingRequestConutLimit + // the limit of outstanding request outStandingRequestConutLimit int32 - // requestSuccessiveFailureThreshold + // the threshold of successive-failure-request requestSuccessiveFailureThreshold int32 - // requestSuccessiveFailureThreshold + // value of circuit-tripped timeout factor circuitTrippedTimeoutFactor int32 } diff --git a/cluster/router/healthcheck/health_check_route.go b/cluster/router/healthcheck/health_check_route.go index 481a85acb0..1ddc9ccb17 100644 --- a/cluster/router/healthcheck/health_check_route.go +++ b/cluster/router/healthcheck/health_check_route.go @@ -39,9 +39,10 @@ type HealthCheckRouter struct { // NewHealthCheckRouter construct an HealthCheckRouter via url func NewHealthCheckRouter(url *common.URL) (router.Router, error) { - r := &HealthCheckRouter{} - r.url = url - r.enabled = url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false) + r := &HealthCheckRouter{ + url: url, + enabled: url.GetParamBool(HEALTH_ROUTE_ENABLED_KEY, false), + } if r.enabled { checkerName := url.GetParam(constant.HEALTH_CHECKER, constant.DEFAULT_HEALTH_CHECKER) r.checker = extension.GetHealthChecker(checkerName, url) @@ -54,7 +55,7 @@ func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, if !r.enabled { return invokers } - var healthyInvokers []protocol.Invoker + healthyInvokers := make([]protocol.Invoker, 0, len(invokers)) // Add healthy invoker to the list for _, invoker := range invokers { if r.checker.IsHealthy(invoker) { @@ -65,9 +66,8 @@ func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, if len(healthyInvokers) == 0 { logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey()) return invokers - } else { - return healthyInvokers } + return healthyInvokers } // Priority diff --git a/common/constant/key.go b/common/constant/key.go index 0ecd4f752e..c8a03b3be9 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -193,13 +193,22 @@ const ( // HealthCheck Router const ( - HEALTH_CHECKER = "health.checker" - DEFAULT_HEALTH_CHECKER = "default" - OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" - SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" - DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 - CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" + // The key of HealthCheck SPI + HEALTH_CHECKER = "health.checker" + // The name of the default implementation of HealthChecker + DEFAULT_HEALTH_CHECKER = "default" + // The key of oustanding-request-limit + OUTSTANDING_REQUEST_COUNT_LIMIT_KEY = "outstanding.request.limit" + // The key of successive-failed-request's threshold + SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY = "successive.failed.threshold" + // The key of circuit-tripped timeout factor + CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY = "circuit.tripped.timeout.factor" + // The default threshold of successive-failed-request if not specfied + DEFAULT_SUCCESSIVE_FAILED_THRESHOLD = 5 + // The default maximum diff between successive-failed-request's threshold and actual successive-failed-request's count DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF = 5 - DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 - MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 + // The default factor of circuit-tripped timeout if not specfied + DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR = 1000 + // The default time window of circuit-tripped in millisecond if not specfied + MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS = 30000 )