Skip to content

Commit 058422d

Browse files
authored
improve mfs republisher (#754)
* improve mfs republisher - Get updated values while retrying to publish - Prefer Close to lifecycle context - Do not require call to Start separate from New
1 parent 0c321cc commit 058422d

File tree

3 files changed

+141
-92
lines changed

3 files changed

+141
-92
lines changed

mfs/repub.go

+94-68
Original file line numberDiff line numberDiff line change
@@ -2,43 +2,47 @@ package mfs
22

33
import (
44
"context"
5+
"errors"
6+
"sync"
57
"time"
68

79
cid "github.com/ipfs/go-cid"
810
)
911

12+
// closeTimeout is how long to wait for current publishing to finish before
13+
// shutting down the republisher.
14+
const closeTimeout = 5 * time.Second
15+
1016
// PubFunc is the user-defined function that determines exactly what
1117
// logic entails "publishing" a `Cid` value.
1218
type PubFunc func(context.Context, cid.Cid) error
1319

1420
// Republisher manages when to publish a given entry.
1521
type Republisher struct {
16-
TimeoutLong time.Duration
17-
TimeoutShort time.Duration
18-
RetryTimeout time.Duration
19-
pubfunc PubFunc
20-
22+
pubfunc PubFunc
2123
update chan cid.Cid
2224
immediatePublish chan chan struct{}
2325

24-
ctx context.Context
25-
cancel func()
26+
cancel func()
27+
closeOnce sync.Once
28+
stopped chan struct{}
2629
}
2730

2831
// NewRepublisher creates a new Republisher object to republish the given root
2932
// using the given short and long time intervals.
30-
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
31-
ctx, cancel := context.WithCancel(ctx)
32-
return &Republisher{
33-
TimeoutShort: tshort,
34-
TimeoutLong: tlong,
35-
RetryTimeout: tlong,
33+
func NewRepublisher(pf PubFunc, tshort, tlong time.Duration, lastPublished cid.Cid) *Republisher {
34+
ctx, cancel := context.WithCancel(context.Background())
35+
rp := &Republisher{
3636
update: make(chan cid.Cid, 1),
3737
pubfunc: pf,
3838
immediatePublish: make(chan chan struct{}),
39-
ctx: ctx,
4039
cancel: cancel,
40+
stopped: make(chan struct{}),
4141
}
42+
43+
go rp.run(ctx, tshort, tlong, lastPublished)
44+
45+
return rp
4246
}
4347

4448
// WaitPub waits for the current value to be published (or returns early
@@ -58,10 +62,22 @@ func (rp *Republisher) WaitPub(ctx context.Context) error {
5862
}
5963
}
6064

65+
// Close tells the republisher to stop and waits for it to stop.
6166
func (rp *Republisher) Close() error {
62-
// TODO(steb): Wait for `Run` to stop
63-
err := rp.WaitPub(rp.ctx)
64-
rp.cancel()
67+
var err error
68+
rp.closeOnce.Do(func() {
69+
// Wait a short amount of time for any current publishing to finish.
70+
ctx, cancel := context.WithTimeout(context.Background(), closeTimeout)
71+
err = rp.WaitPub(ctx)
72+
if errors.Is(err, context.DeadlineExceeded) {
73+
err = errors.New("mfs/republisher: timed out waiting to publish during close")
74+
}
75+
cancel()
76+
// Shutdown the publisher.
77+
rp.cancel()
78+
})
79+
// Wait for pblisher to stop and then return.
80+
<-rp.stopped
6581
return err
6682
}
6783

@@ -82,22 +98,28 @@ func (rp *Republisher) Update(c cid.Cid) {
8298
}
8399

84100
// Run contains the core logic of the `Republisher`. It calls the user-defined
85-
// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The
86-
// complexity comes from the fact that `pubfunc` may be slow so we need to batch
87-
// updates.
101+
// `pubfunc` function whenever the `Cid` value is updated to a *new* value.
102+
// Since calling the `pubfunc` may be slow, updates are batched
88103
//
89104
// Algorithm:
90-
// 1. When we receive the first update after publishing, we set a `longer` timer.
91-
// 2. When we receive any update, we reset the `quick` timer.
92-
// 3. If either the `quick` timeout or the `longer` timeout elapses,
93-
// we call `publish` with the latest updated value.
105+
// 1. When receiving the first update after publishing, set a `longer` timer
106+
// 2. When receiving any update, reset the `quick` timer
107+
// 3. If either the `quick` timeout or the `longer` timeout elapses, call
108+
// `publish` with the latest updated value.
109+
//
110+
// The `longer` timer ensures that publishing is delayed by at most that
111+
// duration. The `quick` timer allows publishing sooner if there are no more
112+
// updates available.
94113
//
95-
// The `longer` timer ensures that we delay publishing by at most
96-
// `TimeoutLong`. The `quick` timer allows us to publish sooner if
97-
// it looks like there are no more updates coming down the pipe.
114+
// In other words, the quick timeout means there are no more values to put into
115+
// the "batch", so do update. The long timeout means there are that the "batch"
116+
// is full, so do update, even though there are still values (no quick timeout
117+
// yet) arriving.
98118
//
99-
// Note: If a publish fails, we retry repeatedly every TimeoutRetry.
100-
func (rp *Republisher) Run(lastPublished cid.Cid) {
119+
// If a publish fails, retry repeatedly every `longer` timeout.
120+
func (rp *Republisher) run(ctx context.Context, timeoutShort, timeoutLong time.Duration, lastPublished cid.Cid) {
121+
defer close(rp.stopped)
122+
101123
quick := time.NewTimer(0)
102124
if !quick.Stop() {
103125
<-quick.C
@@ -107,12 +129,13 @@ func (rp *Republisher) Run(lastPublished cid.Cid) {
107129
<-longer.C
108130
}
109131

132+
immediatePublish := rp.immediatePublish
110133
var toPublish cid.Cid
111-
for rp.ctx.Err() == nil {
112-
var waiter chan struct{}
134+
var waiter chan struct{}
113135

136+
for {
114137
select {
115-
case <-rp.ctx.Done():
138+
case <-ctx.Done():
116139
return
117140
case newValue := <-rp.update:
118141
// Skip already published values.
@@ -123,19 +146,20 @@ func (rp *Republisher) Run(lastPublished cid.Cid) {
123146
break
124147
}
125148

126-
// If we aren't already waiting to publish something,
127-
// reset the long timeout.
149+
// If not already waiting to publish something, reset the long
150+
// timeout.
128151
if !toPublish.Defined() {
129-
longer.Reset(rp.TimeoutLong)
152+
longer.Reset(timeoutLong)
130153
}
131154

132155
// Always reset the short timeout.
133-
quick.Reset(rp.TimeoutShort)
156+
quick.Reset(timeoutShort)
134157

135158
// Finally, set the new value to publish.
136159
toPublish = newValue
160+
// Wait for a newer value or the quick timer.
137161
continue
138-
case waiter = <-rp.immediatePublish:
162+
case waiter = <-immediatePublish:
139163
// Make sure to grab the *latest* value to publish.
140164
select {
141165
case toPublish = <-rp.update:
@@ -147,60 +171,62 @@ func (rp *Republisher) Run(lastPublished cid.Cid) {
147171
toPublish = cid.Undef
148172
}
149173
case <-quick.C:
174+
// Waited a short time for more updates and no more received.
150175
case <-longer.C:
176+
// Keep getting updates and now it is time to send what has been
177+
// received so far.
151178
}
152179

153180
// Cleanup, publish, and close waiters.
154181

155-
// 1. Stop any timers. Don't use the `if !t.Stop() { ... }`
156-
// idiom as these timers may not be running.
157-
182+
// 1. Stop any timers.
158183
quick.Stop()
184+
longer.Stop()
185+
186+
// Do not use the `if !t.Stop() { ... }` idiom as these timers may not
187+
// be running.
188+
//
189+
// TODO: remove after go1.23 required.
159190
select {
160191
case <-quick.C:
161192
default:
162193
}
163-
164-
longer.Stop()
165194
select {
166195
case <-longer.C:
167196
default:
168197
}
169198

170-
// 2. If we have a value to publish, publish it now.
199+
// 2. If there is a value to publish then publish it now.
171200
if toPublish.Defined() {
172-
var timer *time.Timer
173-
for {
174-
err := rp.pubfunc(rp.ctx, toPublish)
175-
if err == nil {
176-
break
177-
}
178-
179-
if timer == nil {
180-
timer = time.NewTimer(rp.RetryTimeout)
181-
defer timer.Stop()
182-
} else {
183-
timer.Reset(rp.RetryTimeout)
184-
}
185-
186-
// Keep retrying until we succeed or we abort.
187-
// TODO(steb): We could try pulling new values
188-
// off `update` but that's not critical (and
189-
// complicates this code a bit). We'll pull off
190-
// a new value on the next loop through.
191-
select {
192-
case <-timer.C:
193-
case <-rp.ctx.Done():
194-
return
195-
}
201+
err := rp.pubfunc(ctx, toPublish)
202+
if err != nil {
203+
// Republish failed, so retry after waiting for long timeout.
204+
//
205+
// Instead of entering a retry loop here, go back to waiting
206+
// for more values and retrying to publish after the lomg
207+
// timeout. Keep using the current waiter until it has been
208+
// notified of a successful publish.
209+
//
210+
// Reset the long timer as it effectively becomes the retry
211+
// timeout.
212+
longer.Reset(timeoutLong)
213+
// Stop reading waiters from immediatePublish while retrying,
214+
// This causes the current waiter to be notified only after a
215+
// successful call to pubfunc, and is what constitutes a retry.
216+
immediatePublish = nil
217+
continue
196218
}
197219
lastPublished = toPublish
198220
toPublish = cid.Undef
221+
// Resume reading waiters,
222+
immediatePublish = rp.immediatePublish
199223
}
200224

201-
// 3. Trigger anything waiting in `WaitPub`.
225+
// 3. Notify anything waiting in `WaitPub` on successful call to
226+
// pubfunc or if nothing to publish.
202227
if waiter != nil {
203228
close(waiter)
229+
waiter = nil
204230
}
205231
}
206232
}

mfs/repub_test.go

+40-14
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,22 @@ import (
77

88
cid "github.com/ipfs/go-cid"
99
ci "github.com/libp2p/go-libp2p-testing/ci"
10+
"github.com/stretchr/testify/require"
1011
)
1112

1213
func TestRepublisher(t *testing.T) {
1314
if ci.IsRunning() {
1415
t.Skip("dont run timing tests in CI")
1516
}
1617

17-
ctx := context.TODO()
18-
1918
pub := make(chan struct{})
2019

2120
pf := func(ctx context.Context, c cid.Cid) error {
22-
pub <- struct{}{}
21+
select {
22+
case pub <- struct{}{}:
23+
case <-ctx.Done():
24+
return ctx.Err()
25+
}
2326
return nil
2427
}
2528

@@ -29,8 +32,7 @@ func TestRepublisher(t *testing.T) {
2932
tshort := time.Millisecond * 50
3033
tlong := time.Second / 2
3134

32-
rp := NewRepublisher(ctx, pf, tshort, tlong)
33-
go rp.Run(cid.Undef)
35+
rp := NewRepublisher(pf, tshort, tlong, cid.Undef)
3436

3537
rp.Update(testCid1)
3638

@@ -41,16 +43,17 @@ func TestRepublisher(t *testing.T) {
4143
case <-pub:
4244
}
4345

44-
cctx, cancel := context.WithCancel(context.Background())
45-
46+
stopUpdates := make(chan struct{})
4647
go func() {
48+
timer := time.NewTimer(time.Hour)
49+
defer timer.Stop()
4750
for {
4851
rp.Update(testCid2)
49-
time.Sleep(time.Millisecond * 10)
52+
timer.Reset(time.Millisecond * 10)
5053
select {
51-
case <-cctx.Done():
54+
case <-timer.C:
55+
case <-stopUpdates:
5256
return
53-
default:
5457
}
5558
}
5659
}()
@@ -66,10 +69,33 @@ func TestRepublisher(t *testing.T) {
6669
t.Fatal("waited too long for pub!")
6770
}
6871

69-
cancel()
72+
close(stopUpdates)
7073

71-
err := rp.Close()
72-
if err != nil {
73-
t.Fatal(err)
74+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
75+
defer cancel()
76+
77+
// Check that republishing update does not call pubfunc again
78+
rp.Update(testCid2)
79+
err := rp.WaitPub(context.Background())
80+
require.NoError(t, err)
81+
select {
82+
case <-pub:
83+
t.Fatal("pub func called again with repeated update")
84+
case <-time.After(tlong * 2):
7485
}
86+
87+
// Check that waitpub times out when blocked pubfunc is called
88+
rp.Update(testCid1)
89+
err = rp.WaitPub(ctx)
90+
require.ErrorIs(t, err, context.DeadlineExceeded)
91+
92+
// Unblock pubfunc.
93+
<-pub
94+
95+
err = rp.Close()
96+
require.NoError(t, err)
97+
98+
// Check that additional call to Close is OK after republisher stopped.
99+
err = rp.Close()
100+
require.NoError(t, err)
75101
}

0 commit comments

Comments
 (0)