Skip to content

Commit

Permalink
Merge pull request #389 from CodingSinger/HEALTH_CHECK
Browse files Browse the repository at this point in the history
add healthcheck router
  • Loading branch information
AlexStocks authored Mar 7, 2020
2 parents 008a1f3 + 91830f5 commit ab850a4
Show file tree
Hide file tree
Showing 12 changed files with 766 additions and 21 deletions.
28 changes: 28 additions & 0 deletions cluster/router/health_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package router

import (
"github.com/apache/dubbo-go/protocol"
)

// HealthChecker is used to determine whether the invoker is healthy or not
type HealthChecker interface {
// IsHealthy evaluates the healthy state on the given Invoker
IsHealthy(invoker protocol.Invoker) bool
}
117 changes: 117 additions & 0 deletions cluster/router/healthcheck/default_health_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package healthcheck

import (
"math"
)

import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

func init() {
extension.SethealthChecker(constant.DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
}

// DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of
// the invoker based on the number of successive bad request and the current active request.
type DefaultHealthChecker struct {
// the limit of outstanding request
outStandingRequestConutLimit int32
// the threshold of successive-failure-request
requestSuccessiveFailureThreshold int32
// value of circuit-tripped timeout factor
circuitTrippedTimeoutFactor int32
}

// IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request
// and the current active request
func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
urlStatus := protocol.GetURLStatus(invoker.GetUrl())
if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.GetOutStandingRequestConutLimit() {
logger.Debugf("Invoker [%s] is currently in circuitbreaker tripped state", invoker.GetUrl().Key())
return false
}
return true
}

// isCircuitBreakerTripped determine whether the invoker is in the tripped state by the number of successive bad request
func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool {
circuitBreakerTimeout := c.getCircuitBreakerTimeout(status)
currentTime := protocol.CurrentTimeMillis()
if circuitBreakerTimeout <= 0 {
return false
}
return circuitBreakerTimeout > currentTime
}

// getCircuitBreakerTimeout get the timestamp recovered from tripped state, the unit is millisecond
func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 {
sleepWindow := c.getCircuitBreakerSleepWindowTime(status)
if sleepWindow <= 0 {
return 0
}
return status.GetLastRequestFailedTimestamp() + sleepWindow
}

// getCircuitBreakerSleepWindowTime get the sleep window time of invoker, the unit is millisecond
func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 {

successiveFailureCount := status.GetSuccessiveRequestFailureCount()
diff := successiveFailureCount - c.GetRequestSuccessiveFailureThreshold()
if diff < 0 {
return 0
} else if diff > constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF {
diff = constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF
}
sleepWindow := (1 << diff) * c.GetCircuitTrippedTimeoutFactor()
if sleepWindow > constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS {
sleepWindow = constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS
}
return int64(sleepWindow)
}

// GetOutStandingRequestConutLimit return the requestSuccessiveFailureThreshold bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetRequestSuccessiveFailureThreshold() int32 {
return c.requestSuccessiveFailureThreshold
}

// GetOutStandingRequestConutLimit return the circuitTrippedTimeoutFactor bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetCircuitTrippedTimeoutFactor() int32 {
return c.circuitTrippedTimeoutFactor
}

// GetOutStandingRequestConutLimit return the outStandingRequestConutLimit bound to this DefaultHealthChecker
func (c *DefaultHealthChecker) GetOutStandingRequestConutLimit() int32 {
return c.outStandingRequestConutLimit
}

// NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url
func NewDefaultHealthChecker(url *common.URL) router.HealthChecker {
return &DefaultHealthChecker{
outStandingRequestConutLimit: int32(url.GetParamInt(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)),
requestSuccessiveFailureThreshold: int32(url.GetParamInt(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF)),
circuitTrippedTimeoutFactor: int32(url.GetParamInt(constant.CIRCUIT_TRIPPED_TIMEOUT_FACTOR_KEY, constant.DEFAULT_CIRCUIT_TRIPPED_TIMEOUT_FACTOR)),
}
}
157 changes: 157 additions & 0 deletions cluster/router/healthcheck/default_health_check_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package healthcheck

import (
"math"
"testing"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
)

func TestDefaultHealthChecker_IsHealthy(t *testing.T) {

defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
hc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
invoker := NewMockInvoker(url, 1)
healthy := hc.IsHealthy(invoker)
assert.True(t, healthy)

url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "100")
// fake the outgoing request
for i := 0; i < 11; i++ {
request(url, "test", 0, true, false)
}
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
// the outgoing request is more than OUTSTANDING_REQUEST_COUNT_LIMIT, go to unhealthy
assert.False(t, hc.IsHealthy(invoker))

// successive failed count is more than constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, go to unhealthy
for i := 0; i < 11; i++ {
request(url, "test", 0, false, false)
}
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
url.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "1000")
hc = NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
healthy = hc.IsHealthy(invoker)
assert.False(t, hc.IsHealthy(invoker))

// reset successive failed count and go to healthy
request(url, "test", 0, false, true)
healthy = hc.IsHealthy(invoker)
assert.True(t, hc.IsHealthy(invoker))
}

func TestDefaultHealthChecker_getCircuitBreakerSleepWindowTime(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
}
sleepWindowTime := defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)

// Adjust the threshold size to 1000
url.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "1000")
sleepWindowTime = NewDefaultHealthChecker(&url).(*DefaultHealthChecker).getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url))
assert.True(t, sleepWindowTime == 0)

url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime == 0)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
sleepWindowTime = defaultHc.getCircuitBreakerSleepWindowTime(protocol.GetURLStatus(url1))
assert.True(t, sleepWindowTime > 0 && sleepWindowTime < constant.MAX_CIRCUIT_TRIPPED_TIMEOUT_IN_MS)
}

func TestDefaultHealthChecker_getCircuitBreakerTimeout(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
timeout := defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url))
assert.True(t, timeout == 0)
url1, _ := common.NewURL("dubbo://192.168.10.11:20000/com.ikurento.user.UserProvider")
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
request(url1, "test", 1, false, false)
timeout = defaultHc.getCircuitBreakerTimeout(protocol.GetURLStatus(url1))
// timeout must after the current time
assert.True(t, timeout > protocol.CurrentTimeMillis())

}

func TestDefaultHealthChecker_isCircuitBreakerTripped(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
status := protocol.GetURLStatus(url)
tripped := defaultHc.isCircuitBreakerTripped(status)
assert.False(t, tripped)
// Increase the number of failed requests
for i := 0; i < 100; i++ {
request(url, "test", 1, false, false)
}
tripped = defaultHc.isCircuitBreakerTripped(protocol.GetURLStatus(url))
assert.True(t, tripped)

}

func TestNewDefaultHealthChecker(t *testing.T) {
defer protocol.CleanAllStatus()
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
defaultHc := NewDefaultHealthChecker(&url).(*DefaultHealthChecker)
assert.NotNil(t, defaultHc)
assert.Equal(t, defaultHc.outStandingRequestConutLimit, int32(math.MaxInt32))
assert.Equal(t, defaultHc.requestSuccessiveFailureThreshold, int32(constant.DEFAULT_SUCCESSIVE_FAILED_REQUEST_MAX_DIFF))

url1, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
url1.SetParam(constant.OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, "10")
url1.SetParam(constant.SUCCESSIVE_FAILED_REQUEST_THRESHOLD_KEY, "10")
nondefaultHc := NewDefaultHealthChecker(&url1).(*DefaultHealthChecker)
assert.NotNil(t, nondefaultHc)
assert.Equal(t, nondefaultHc.outStandingRequestConutLimit, int32(10))
assert.Equal(t, nondefaultHc.requestSuccessiveFailureThreshold, int32(10))
}

func request(url common.URL, method string, elapsed int64, active, succeeded bool) {
protocol.BeginCount(url, method)
if !active {
protocol.EndCount(url, method, elapsed, succeeded)
}
}
43 changes: 43 additions & 0 deletions cluster/router/healthcheck/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package healthcheck

import (
"github.com/apache/dubbo-go/cluster/router"
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
)

func init() {
extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory)
}

// HealthCheckRouteFactory
type HealthCheckRouteFactory struct {
}

// newHealthCheckRouteFactory construct a new HealthCheckRouteFactory
func newHealthCheckRouteFactory() router.RouterFactory {
return &HealthCheckRouteFactory{}
}

// NewRouter construct a new NewHealthCheckRouter via url
func (f *HealthCheckRouteFactory) NewRouter(url *common.URL) (router.Router, error) {
return NewHealthCheckRouter(url)
}
Loading

0 comments on commit ab850a4

Please sign in to comment.