Skip to content

Commit cdac50e

Browse files
authored
Merge pull request #1038 from qazwsxedckll/dev
bug fixes
2 parents ef91a6a + 225f8e7 commit cdac50e

17 files changed

+55
-32
lines changed

actor/throttler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func NewThrottle(maxEventsInPeriod int32, period time.Duration, throttledCallBac
5858
func NewThrottleWithLogger(logger *slog.Logger, maxEventsInPeriod int32, period time.Duration, throttledCallBack func(*slog.Logger, int32)) ShouldThrottle {
5959
currentEvents := int32(0)
6060

61-
startTimer := func(duration time.Duration, back func(*slog.Logger, int32)) {
61+
startTimer := func(duration time.Duration) {
6262
go func() {
6363
// crete ticker to mimic sleep, we do not want to put the goroutine to sleep
6464
// as it will schedule it out of the P making a syscall, we just want it to
@@ -77,7 +77,7 @@ func NewThrottleWithLogger(logger *slog.Logger, maxEventsInPeriod int32, period
7777
return func() Valve {
7878
tries := atomic.AddInt32(&currentEvents, 1)
7979
if tries == 1 {
80-
startTimer(period, throttledCallBack)
80+
startTimer(period)
8181
}
8282

8383
if tries == maxEventsInPeriod {

cluster/clusterproviders/k8s/k8s_provider.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ func (p *Provider) startClusterMonitor(c *cluster.Cluster) error {
159159
p.clusterMonitor, err = c.ActorSystem.Root.SpawnNamed(actor.PropsFromProducer(func() actor.Actor {
160160
return newClusterMonitor(p)
161161
}), "k8s-cluster-monitor")
162-
163162
if err != nil {
164163
p.cluster.Logger().Error("Failed to start k8s-cluster-monitor actor", slog.Any("error", err))
165164
return err
@@ -177,7 +176,7 @@ func (p *Provider) registerMemberAsync(c *cluster.Cluster) {
177176

178177
// registers itself as a member in k8s cluster
179178
func (p *Provider) registerMember(timeout time.Duration) error {
180-
p.cluster.Logger().Info(fmt.Sprintf("Registering service %s on %s", p.podName, p.address))
179+
p.cluster.Logger().Info("Registering service in Kubernetes", slog.String("podName", p.podName), slog.String("address", p.address))
181180

182181
ctx, cancel := context.WithTimeout(context.Background(), timeout)
183182
defer cancel()
@@ -187,7 +186,7 @@ func (p *Provider) registerMember(timeout time.Duration) error {
187186
return fmt.Errorf("unable to get own pod information for %s: %w", p.podName, err)
188187
}
189188

190-
p.cluster.Logger().Info(fmt.Sprintf("Using Kubernetes namespace: %s\nUsing Kubernetes port: %d", pod.Namespace, p.port))
189+
p.cluster.Logger().Info("Using Kubernetes namespace", slog.String("namespace", pod.Namespace), slog.Int("port", p.port))
191190

192191
labels := Labels{
193192
LabelCluster: p.clusterName,
@@ -218,7 +217,7 @@ func (p *Provider) startWatchingClusterAsync(c *cluster.Cluster) {
218217
func (p *Provider) startWatchingCluster() error {
219218
selector := fmt.Sprintf("%s=%s", LabelCluster, p.clusterName)
220219

221-
p.cluster.Logger().Debug(fmt.Sprintf("Starting to watch pods with %s", selector), slog.String("selector", selector))
220+
p.cluster.Logger().Debug("Starting to watch pods", slog.String("selector", selector))
222221

223222
ctx, cancel := context.WithCancel(context.Background())
224223
p.cancelWatch = cancel
@@ -365,7 +364,7 @@ func mapPodsToMembers(clusterPods map[types.UID]*v1.Pod, logger *slog.Logger) []
365364

366365
// deregister itself as a member from a k8s cluster
367366
func (p *Provider) deregisterMember(timeout time.Duration) error {
368-
p.cluster.Logger().Info(fmt.Sprintf("Deregistering service %s from %s", p.podName, p.address))
367+
p.cluster.Logger().Info("Deregistering service from Kubernetes", slog.String("podName", p.podName), slog.String("address", p.address))
369368

370369
ctx, cancel := context.WithTimeout(context.Background(), timeout)
371370
defer cancel()
@@ -419,7 +418,7 @@ func (p *Provider) retrieveNamespace() string {
419418
filename := filepath.Join(string(filepath.Separator), "var", "run", "secrets", "kubernetes.io", "serviceaccount", "namespace")
420419
content, err := os.ReadFile(filename)
421420
if err != nil {
422-
p.cluster.Logger().Warn(fmt.Sprintf("Could not read %s contents defaulting to empty namespace: %s", filename, err.Error()))
421+
p.cluster.Logger().Warn("Could not read contents, defaulting to empty namespace", slog.String("filename", filename), slog.Any("error", err))
423422
return p.namespace
424423
}
425424
p.namespace = string(content)

cluster/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type Config struct {
1717
RemoteConfig *remote.Config
1818
RequestTimeoutTime time.Duration
1919
RequestsLogThrottlePeriod time.Duration
20+
RequestLog bool
2021
MaxNumberOfEventsInRequestLogThrottledPeriod int
2122
ClusterContextProducer ContextProducer
2223
MemberStrategyBuilder func(cluster *Cluster, kind string) MemberStrategy
@@ -62,7 +63,6 @@ func Configure(clusterName string, clusterProvider ClusterProvider, identityLook
6263
// into a valid ClusterContextConfig value and returns a pointer to its memory
6364
func (c *Config) ToClusterContextConfig(logger *slog.Logger) *ClusterContextConfig {
6465
clusterContextConfig := ClusterContextConfig{
65-
6666
RequestsLogThrottlePeriod: c.RequestsLogThrottlePeriod,
6767
MaxNumberOfEventsInRequestLogThrottledPeriod: c.MaxNumberOfEventsInRequestLogThrottledPeriod,
6868

cluster/config_opts.go

+6
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,9 @@ func WithHeartbeatExpiration(t time.Duration) ConfigOption {
5454
c.HeartbeatExpiration = t
5555
}
5656
}
57+
58+
func WithRequestLog(enabled bool) ConfigOption {
59+
return func(c *Config) {
60+
c.RequestLog = enabled
61+
}
62+
}

cluster/default_context.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"fmt"
88
"log/slog"
9+
"reflect"
910
"time"
1011

1112
"github.com/asynkron/protoactor-go/actor"
@@ -50,7 +51,7 @@ func (dcc *DefaultContext) Request(identity, kind string, message interface{}, o
5051

5152
start := time.Now()
5253

53-
dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting %s:%s Message %#v", identity, kind, message))
54+
dcc.cluster.Logger().Debug("Requesting", slog.String("identity", identity), slog.String("kind", kind), slog.String("type", reflect.TypeOf(message).String()), slog.Any("message", message))
5455

5556
// crate a new Timeout Context
5657
ttl := callConfig.Timeout
@@ -120,7 +121,7 @@ func (dcc *DefaultContext) RequestFuture(identity string, kind string, message i
120121

121122
_context := callConfig.Context
122123

123-
dcc.cluster.Logger().Debug(fmt.Sprintf("Requesting future %s:%s Message %#v", identity, kind, message))
124+
dcc.cluster.Logger().Debug("Requesting future", slog.String("identity", identity), slog.String("kind", kind), slog.String("type", reflect.TypeOf(message).String()), slog.Any("message", message))
124125

125126
// crate a new Timeout Context
126127
ttl := callConfig.Timeout

cluster/gossiper.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func newGossiper(cl *Cluster, opts ...Option) (*Gossiper, error) {
7070

7171
func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
7272
if g.throttler() == actor.Open {
73-
g.cluster.Logger().Debug(fmt.Sprintf("Gossiper getting state from %s", g.pid))
73+
g.cluster.Logger().Debug("Gossiper getting state", slog.String("key", key), slog.String("remote", g.pid.String()))
7474
}
7575

7676
msg := NewGetGossipStateRequest(key)
@@ -104,7 +104,7 @@ func (g *Gossiper) GetState(key string) (map[string]*GossipKeyValue, error) {
104104
// SetState Sends fire and forget message to update member state
105105
func (g *Gossiper) SetState(key string, value proto.Message) {
106106
if g.throttler() == actor.Open {
107-
g.cluster.Logger().Debug(fmt.Sprintf("Gossiper setting state %s to %s", key, g.pid))
107+
g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String()))
108108
}
109109

110110
if g.pid == nil {
@@ -118,7 +118,7 @@ func (g *Gossiper) SetState(key string, value proto.Message) {
118118
// SetStateRequest Sends a Request (that blocks) to update member state
119119
func (g *Gossiper) SetStateRequest(key string, value proto.Message) error {
120120
if g.throttler() == actor.Open {
121-
g.cluster.Logger().Debug(fmt.Sprintf("Gossiper setting state %s to %s", key, g.pid))
121+
g.cluster.Logger().Debug("Gossiper setting state", slog.String("key", key), slog.String("remote", g.pid.String()))
122122
}
123123

124124
if g.pid == nil {
@@ -186,7 +186,6 @@ func (g *Gossiper) StartGossiping() error {
186186
system,
187187
)
188188
}), g.GossipActorName)
189-
190189
if err != nil {
191190
g.cluster.Logger().Error("Failed to start gossip actor", slog.Any("error", err))
192191
return err
@@ -300,5 +299,5 @@ func (g *Gossiper) blockGracefullyLeft() {
300299
}
301300

302301
func (g *Gossiper) throttledLog(counter int32) {
303-
g.cluster.Logger().Debug(fmt.Sprintf("[Gossiper] Gossiper Setting State to %s", g.pid), slog.Int("throttled", int(counter)))
302+
g.cluster.Logger().Debug("Gossiper Setting State", slog.String("remote", g.pid.String()), slog.Int("throttled", int(counter)))
304303
}

cluster/pubsub_topic.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,14 @@ func (t *TopicActor) logDeliveryErrors(reports []*SubscriberDeliveryReport, logg
170170
}
171171

172172
// unsubscribeUnreachablePidSubscribers deletes all subscribers that have a PID that is unreachable
173-
func (t *TopicActor) unsubscribeUnreachablePidSubscribers(_ actor.Context, allInvalidDeliveryReports []*SubscriberDeliveryReport) {
173+
func (t *TopicActor) unsubscribeUnreachablePidSubscribers(c actor.Context, allInvalidDeliveryReports []*SubscriberDeliveryReport) {
174174
subscribers := make([]subscribeIdentityStruct, 0, len(allInvalidDeliveryReports))
175175
for _, r := range allInvalidDeliveryReports {
176176
if r.Subscriber.GetPid() != nil && r.Status == DeliveryStatus_SubscriberNoLongerReachable {
177177
subscribers = append(subscribers, newSubscribeIdentityStruct(r.Subscriber))
178178
}
179179
}
180-
t.removeSubscribers(subscribers, nil)
180+
t.removeSubscribers(subscribers, c.Logger())
181181
}
182182

183183
// onClusterTopologyChanged handles a ClusterTopology message
@@ -217,7 +217,7 @@ func (t *TopicActor) unsubscribeSubscribersOnMembersThatLeft(c actor.Context) {
217217
}
218218
}
219219
}
220-
t.removeSubscribers(subscribersThatLeft, nil)
220+
t.removeSubscribers(subscribersThatLeft, c.Logger())
221221
}
222222

223223
// removeSubscribers remove subscribers from the topic

protobuf/protoc-gen-go-grain/templates/grain.tmpl

+3
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ func (g *{{ $service.Name }}GrainClient) {{ $method.Name }}Future(r *{{ $method.
7979
{{ end }}
8080
// {{ $method.Name }} requests the execution on to the cluster with CallOptions
8181
func (g *{{ $service.Name }}GrainClient) {{ $method.Name }}(r *{{ $method.Input }}, opts ...cluster.GrainCallOption) (*{{ $method.Output }}, error) {
82+
if g.cluster.Config.RequestLog {
83+
g.cluster.Logger().Info("Requesting", slog.String("identity", g.Identity), slog.String("kind", "{{ $service.Name }}"), slog.String("method", "{{ $method.Name }}"), slog.Any("request", r))
84+
}
8285
bytes, err := proto.Marshal(r)
8386
if err != nil {
8487
return nil, err

protobuf/protoc-gen-go-grain/test/error/error.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protobuf/protoc-gen-go-grain/test/error/error_grain.pb.go

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protobuf/protoc-gen-go-grain/test/hello/hello.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protobuf/protoc-gen-go-grain/test/hello/hello_grain.pb.go

+5-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protobuf/protoc-gen-go-grain/test/multi-services/services.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protobuf/protoc-gen-go-grain/test/multi-services/services_grain.pb.go

+8-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protobuf/protoc-gen-go-grain/test/reenter/hello.pb.go

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protobuf/protoc-gen-go-grain/test/reenter/hello_grain.pb.go

+8-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
package main
22

3-
const version = "v0.6.1"
3+
const version = "v0.7.0"

0 commit comments

Comments
 (0)