Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingSinger committed Mar 4, 2020
1 parent df90d9b commit 65ff41c
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 3 deletions.
16 changes: 13 additions & 3 deletions cluster/router/condition/default_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@ func init() {
extension.SethealthChecker(DEFAULT_HEALTH_CHECKER, NewDefaultHealthChecker)
}

// DefaultHealthChecker is the default
// DefaultHealthChecker is the default implementation of HealthChecker, which determines the health status of
// the invoker based on the number of successive bad request and the current active request.
type DefaultHealthChecker struct {
// OutStandingRequestConutLimit
OutStandingRequestConutLimit int32
// the circuitbreaker threshold
// RequestSuccessiveFailureThreshold
RequestSuccessiveFailureThreshold int32
CircuitTrippedTimeoutFactor int32
// RequestSuccessiveFailureThreshold
CircuitTrippedTimeoutFactor int32
}

// IsHealthy evaluates the healthy state on the given Invoker based on the number of successive bad request
// and the current active request
func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
urlStatus := protocol.GetURLStatus(invoker.GetUrl())
if c.isCircuitBreakerTripped(urlStatus) || urlStatus.GetActive() > c.OutStandingRequestConutLimit {
Expand All @@ -44,6 +49,8 @@ func (c *DefaultHealthChecker) IsHealthy(invoker protocol.Invoker) bool {
}
return true
}

// isCircuitBreakerTripped determine whether the invoker is in the tripped state by the number of successive bad request
func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatus) bool {
circuitBreakerTimeout := c.getCircuitBreakerTimeout(status)
currentTime := protocol.CurrentTimeMillis()
Expand All @@ -53,6 +60,7 @@ func (c *DefaultHealthChecker) isCircuitBreakerTripped(status *protocol.RPCStatu
return circuitBreakerTimeout > currentTime
}

// getCircuitBreakerTimeout get the timestamp recovered from tripped state
func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStatus) int64 {
sleepWindow := c.getCircuitBreakerSleepWindowTime(status)
if sleepWindow <= 0 {
Expand All @@ -61,6 +69,7 @@ func (c *DefaultHealthChecker) getCircuitBreakerTimeout(status *protocol.RPCStat
return status.GetLastRequestFailedTimestamp() + sleepWindow
}

// getCircuitBreakerSleepWindowTime get the sleep window time of invoker
func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol.RPCStatus) int64 {

successiveFailureCount := status.GetSuccessiveRequestFailureCount()
Expand All @@ -77,6 +86,7 @@ func (c *DefaultHealthChecker) getCircuitBreakerSleepWindowTime(status *protocol
return int64(sleepWindow)
}

// NewDefaultHealthChecker constructs a new DefaultHealthChecker based on the url
func NewDefaultHealthChecker(url *common.URL) router.HealthChecker {
return &DefaultHealthChecker{
OutStandingRequestConutLimit: int32(url.GetParamInt(OUTSTANDING_REQUEST_COUNT_LIMIT_KEY, math.MaxInt32)),
Expand Down
1 change: 1 addition & 0 deletions cluster/router/condition/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
func init() {
extension.SetRouterFactory(constant.ConditionRouterName, newConditionRouterFactory)
extension.SetRouterFactory(constant.ConditionAppRouterName, newAppRouterFactory)
extension.SetRouterFactory(constant.HealthCheckRouterName, newHealthCheckRouteFactory)
}

// ConditionRouterFactory Condition router factory
Expand Down
7 changes: 7 additions & 0 deletions cluster/router/condition/health_check_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ const (
HEALTH_ROUTE_ENABLED_KEY = "health.route.enabled"
)

// HealthCheckRouter provides a health-first routing mechanism through HealthChecker
type HealthCheckRouter struct {
url *common.URL
enabled bool
checker router.HealthChecker
}

// NewHealthCheckRouter construct an HealthCheckRouter via url
func NewHealthCheckRouter(url *common.URL) (router.Router, error) {
r := &HealthCheckRouter{}
r.url = url
Expand All @@ -29,16 +31,19 @@ func NewHealthCheckRouter(url *common.URL) (router.Router, error) {
return r, nil
}

// Route gets a list of healthy invoker
func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL, invocation protocol.Invocation) []protocol.Invoker {
if !r.enabled {
return invokers
}
var healthyInvokers []protocol.Invoker
// Add healthy invoker to the list
for _, invoker := range invokers {
if r.checker.IsHealthy(invoker) {
healthyInvokers = append(healthyInvokers, invoker)
}
}
// If all Invoke are considered unhealthy, downgrade to all inovker
if len(healthyInvokers) == 0 {
logger.Warnf(" Now all invokers are unhealthy, so downgraded to all! Service: [%s]", url.ServiceKey())
return invokers
Expand All @@ -47,6 +52,7 @@ func (r *HealthCheckRouter) Route(invokers []protocol.Invoker, url *common.URL,
}
}

// Priority
func (r *HealthCheckRouter) Priority() int64 {
return 0
}
Expand All @@ -56,6 +62,7 @@ func (r *HealthCheckRouter) URL() common.URL {
return *r.url
}

// HealthyChecker returns the HealthChecker bound to this HealthCheckRouter
func (r *HealthCheckRouter) HealthyChecker() router.HealthChecker {
return r.checker
}
2 changes: 2 additions & 0 deletions cluster/router/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/apache/dubbo-go/protocol"
)

// HealthChecker is used to determine whether the invoker is healthy or not
type HealthChecker interface {
// IsHealthy evaluates the healthy state on the given Invoker
IsHealthy(invoker protocol.Invoker) bool
}
1 change: 1 addition & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ const (
ConditionAppRouterName = "app"
// ListenableRouterName Specify listenable router name
ListenableRouterName = "listenable"
HealthCheckRouterName = "health_check"

// ConditionRouterRuleSuffix Specify condition router suffix
ConditionRouterRuleSuffix = ".condition-router"
Expand Down
2 changes: 2 additions & 0 deletions common/extension/health_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ var (
healthCheckers = make(map[string]func(url *common.URL) router.HealthChecker)
)

// SethealthChecker set the HealthChecker with name
func SethealthChecker(name string, fcn func(url *common.URL) router.HealthChecker) {
healthCheckers[name] = fcn
}

// GetHealthChecker get the HealthChecker with name
func GetHealthChecker(name string, url *common.URL) router.HealthChecker {
if healthCheckers[name] == nil {
panic("healthCheckers for " + name + " is not existing, make sure you have import the package.")
Expand Down

0 comments on commit 65ff41c

Please sign in to comment.