Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit ab7ddf0

Browse files
authoredFeb 20, 2019
Merge pull request #74 from ipfs/feat/differentiate_wantList
More specific wantlists
2 parents 472a8ab + 95f6e62 commit ab7ddf0

11 files changed

+186
-167
lines changed
 

‎bitswap.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
9797
return nil
9898
})
9999

100-
peerQueueFactory := func(p peer.ID) bspm.PeerQueue {
101-
return bsmq.New(p, network)
100+
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
101+
return bsmq.New(ctx, p, network)
102102
}
103103

104104
wm := bswm.New(ctx)

‎decision/peer_request_queue.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
6060
defer partner.activelk.Unlock()
6161

6262
var priority int
63-
newEntries := make([]*wantlist.Entry, 0, len(entries))
63+
newEntries := make([]*peerRequestTaskEntry, 0, len(entries))
6464
for _, entry := range entries {
6565
if partner.activeBlocks.Has(entry.Cid) {
6666
continue
@@ -75,7 +75,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
7575
if entry.Priority > priority {
7676
priority = entry.Priority
7777
}
78-
newEntries = append(newEntries, entry)
78+
newEntries = append(newEntries, &peerRequestTaskEntry{entry, false})
7979
}
8080

8181
if len(newEntries) == 0 {
@@ -86,7 +86,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
8686
Entries: newEntries,
8787
Target: to,
8888
created: time.Now(),
89-
Done: func(e []*wantlist.Entry) {
89+
Done: func(e []*peerRequestTaskEntry) {
9090
tl.lock.Lock()
9191
for _, entry := range e {
9292
partner.TaskDone(entry.Cid)
@@ -117,10 +117,10 @@ func (tl *prq) Pop() *peerRequestTask {
117117
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
118118
out = partner.taskQueue.Pop().(*peerRequestTask)
119119

120-
newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
120+
newEntries := make([]*peerRequestTaskEntry, 0, len(out.Entries))
121121
for _, entry := range out.Entries {
122122
delete(tl.taskMap, taskEntryKey{out.Target, entry.Cid})
123-
if entry.Trash {
123+
if entry.trash {
124124
continue
125125
}
126126
partner.requests--
@@ -150,7 +150,7 @@ func (tl *prq) Remove(k cid.Cid, p peer.ID) {
150150
// remove the task "lazily"
151151
// simply mark it as trash, so it'll be dropped when popped off the
152152
// queue.
153-
entry.Trash = true
153+
entry.trash = true
154154
break
155155
}
156156
}
@@ -197,13 +197,18 @@ func (tl *prq) thawRound() {
197197
}
198198
}
199199

200+
type peerRequestTaskEntry struct {
201+
*wantlist.Entry
202+
// trash in a book-keeping field
203+
trash bool
204+
}
200205
type peerRequestTask struct {
201-
Entries []*wantlist.Entry
206+
Entries []*peerRequestTaskEntry
202207
Priority int
203208
Target peer.ID
204209

205210
// A callback to signal that this task has been completed
206-
Done func([]*wantlist.Entry)
211+
Done func([]*peerRequestTaskEntry)
207212

208213
// created marks the time that the task was added to the queue
209214
created time.Time

‎messagequeue/messagequeue.go

+99-82
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package messagequeue
22

33
import (
44
"context"
5-
"sync"
65
"time"
76

87
bsmsg "github.com/ipfs/go-bitswap/message"
@@ -23,86 +22,99 @@ type MessageNetwork interface {
2322
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
2423
}
2524

25+
type request interface {
26+
handle(mq *MessageQueue)
27+
}
28+
2629
// MessageQueue implements queue of want messages to send to peers.
2730
type MessageQueue struct {
28-
p peer.ID
29-
30-
outlk sync.Mutex
31-
out bsmsg.BitSwapMessage
31+
ctx context.Context
32+
p peer.ID
3233
network MessageNetwork
33-
wl *wantlist.ThreadSafe
3434

35-
sender bsnet.MessageSender
35+
newRequests chan request
36+
outgoingMessages chan bsmsg.BitSwapMessage
37+
done chan struct{}
38+
39+
// do not touch out of run loop
40+
wl *wantlist.SessionTrackedWantlist
41+
nextMessage bsmsg.BitSwapMessage
42+
sender bsnet.MessageSender
43+
}
44+
45+
type messageRequest struct {
46+
entries []*bsmsg.Entry
47+
ses uint64
48+
}
3649

37-
work chan struct{}
38-
done chan struct{}
50+
type wantlistRequest struct {
51+
wl *wantlist.SessionTrackedWantlist
3952
}
4053

4154
// New creats a new MessageQueue.
42-
func New(p peer.ID, network MessageNetwork) *MessageQueue {
55+
func New(ctx context.Context, p peer.ID, network MessageNetwork) *MessageQueue {
4356
return &MessageQueue{
44-
done: make(chan struct{}),
45-
work: make(chan struct{}, 1),
46-
wl: wantlist.NewThreadSafe(),
47-
network: network,
48-
p: p,
57+
ctx: ctx,
58+
wl: wantlist.NewSessionTrackedWantlist(),
59+
network: network,
60+
p: p,
61+
newRequests: make(chan request, 16),
62+
outgoingMessages: make(chan bsmsg.BitSwapMessage),
63+
done: make(chan struct{}),
4964
}
5065
}
5166

5267
// AddMessage adds new entries to an outgoing message for a given session.
5368
func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
54-
if !mq.addEntries(entries, ses) {
55-
return
56-
}
5769
select {
58-
case mq.work <- struct{}{}:
59-
default:
70+
case mq.newRequests <- &messageRequest{entries, ses}:
71+
case <-mq.ctx.Done():
6072
}
6173
}
6274

6375
// AddWantlist adds a complete session tracked want list to a message queue
64-
func (mq *MessageQueue) AddWantlist(initialEntries []*wantlist.Entry) {
65-
if len(initialEntries) > 0 {
66-
if mq.out == nil {
67-
mq.out = bsmsg.New(false)
68-
}
76+
func (mq *MessageQueue) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {
77+
wl := wantlist.NewSessionTrackedWantlist()
78+
initialWants.CopyWants(wl)
6979

70-
for _, e := range initialEntries {
71-
for k := range e.SesTrk {
72-
mq.wl.AddEntry(e, k)
73-
}
74-
mq.out.AddEntry(e.Cid, e.Priority)
75-
}
76-
77-
select {
78-
case mq.work <- struct{}{}:
79-
default:
80-
}
80+
select {
81+
case mq.newRequests <- &wantlistRequest{wl}:
82+
case <-mq.ctx.Done():
8183
}
8284
}
8385

8486
// Startup starts the processing of messages, and creates an initial message
8587
// based on the given initial wantlist.
86-
func (mq *MessageQueue) Startup(ctx context.Context) {
87-
go mq.runQueue(ctx)
88+
func (mq *MessageQueue) Startup() {
89+
go mq.runQueue()
90+
go mq.sendMessages()
8891
}
8992

9093
// Shutdown stops the processing of messages for a message queue.
9194
func (mq *MessageQueue) Shutdown() {
9295
close(mq.done)
9396
}
9497

95-
func (mq *MessageQueue) runQueue(ctx context.Context) {
98+
func (mq *MessageQueue) runQueue() {
99+
outgoingMessages := func() chan bsmsg.BitSwapMessage {
100+
if mq.nextMessage == nil {
101+
return nil
102+
}
103+
return mq.outgoingMessages
104+
}
105+
96106
for {
97107
select {
98-
case <-mq.work: // there is work to be done
99-
mq.doWork(ctx)
108+
case newRequest := <-mq.newRequests:
109+
newRequest.handle(mq)
110+
case outgoingMessages() <- mq.nextMessage:
111+
mq.nextMessage = nil
100112
case <-mq.done:
101113
if mq.sender != nil {
102114
mq.sender.Close()
103115
}
104116
return
105-
case <-ctx.Done():
117+
case <-mq.ctx.Done():
106118
if mq.sender != nil {
107119
mq.sender.Reset()
108120
}
@@ -111,72 +123,86 @@ func (mq *MessageQueue) runQueue(ctx context.Context) {
111123
}
112124
}
113125

114-
func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) bool {
115-
var work bool
116-
mq.outlk.Lock()
117-
defer mq.outlk.Unlock()
118-
// if we have no message held allocate a new one
119-
if mq.out == nil {
120-
mq.out = bsmsg.New(false)
126+
func (mr *messageRequest) handle(mq *MessageQueue) {
127+
mq.addEntries(mr.entries, mr.ses)
128+
}
129+
130+
func (wr *wantlistRequest) handle(mq *MessageQueue) {
131+
initialWants := wr.wl
132+
initialWants.CopyWants(mq.wl)
133+
if initialWants.Len() > 0 {
134+
if mq.nextMessage == nil {
135+
mq.nextMessage = bsmsg.New(false)
136+
}
137+
for _, e := range initialWants.Entries() {
138+
mq.nextMessage.AddEntry(e.Cid, e.Priority)
139+
}
121140
}
141+
}
122142

123-
// TODO: add a msg.Combine(...) method
124-
// otherwise, combine the one we are holding with the
125-
// one passed in
143+
func (mq *MessageQueue) addEntries(entries []*bsmsg.Entry, ses uint64) {
126144
for _, e := range entries {
127145
if e.Cancel {
128146
if mq.wl.Remove(e.Cid, ses) {
129-
work = true
130-
mq.out.Cancel(e.Cid)
147+
if mq.nextMessage == nil {
148+
mq.nextMessage = bsmsg.New(false)
149+
}
150+
mq.nextMessage.Cancel(e.Cid)
131151
}
132152
} else {
133153
if mq.wl.Add(e.Cid, e.Priority, ses) {
134-
work = true
135-
mq.out.AddEntry(e.Cid, e.Priority)
154+
if mq.nextMessage == nil {
155+
mq.nextMessage = bsmsg.New(false)
156+
}
157+
mq.nextMessage.AddEntry(e.Cid, e.Priority)
136158
}
137159
}
138160
}
139-
140-
return work
141161
}
142162

143-
func (mq *MessageQueue) doWork(ctx context.Context) {
144-
145-
wlm := mq.extractOutgoingMessage()
146-
if wlm == nil || wlm.Empty() {
147-
return
163+
func (mq *MessageQueue) sendMessages() {
164+
for {
165+
select {
166+
case nextMessage := <-mq.outgoingMessages:
167+
mq.sendMessage(nextMessage)
168+
case <-mq.done:
169+
return
170+
case <-mq.ctx.Done():
171+
return
172+
}
148173
}
174+
}
175+
176+
func (mq *MessageQueue) sendMessage(message bsmsg.BitSwapMessage) {
149177

150-
// NB: only open a stream if we actually have data to send
151-
err := mq.initializeSender(ctx)
178+
err := mq.initializeSender()
152179
if err != nil {
153180
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
154181
// TODO: cant connect, what now?
155182
return
156183
}
157184

158-
// send wantlist updates
159185
for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
160-
if mq.attemptSendAndRecovery(ctx, wlm) {
186+
if mq.attemptSendAndRecovery(message) {
161187
return
162188
}
163189
}
164190
}
165191

166-
func (mq *MessageQueue) initializeSender(ctx context.Context) error {
192+
func (mq *MessageQueue) initializeSender() error {
167193
if mq.sender != nil {
168194
return nil
169195
}
170-
nsender, err := openSender(ctx, mq.network, mq.p)
196+
nsender, err := openSender(mq.ctx, mq.network, mq.p)
171197
if err != nil {
172198
return err
173199
}
174200
mq.sender = nsender
175201
return nil
176202
}
177203

178-
func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.BitSwapMessage) bool {
179-
err := mq.sender.SendMsg(ctx, wlm)
204+
func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
205+
err := mq.sender.SendMsg(mq.ctx, message)
180206
if err == nil {
181207
return true
182208
}
@@ -188,14 +214,14 @@ func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.Bi
188214
select {
189215
case <-mq.done:
190216
return true
191-
case <-ctx.Done():
217+
case <-mq.ctx.Done():
192218
return true
193219
case <-time.After(time.Millisecond * 100):
194220
// wait 100ms in case disconnect notifications are still propogating
195221
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
196222
}
197223

198-
err = mq.initializeSender(ctx)
224+
err = mq.initializeSender()
199225
if err != nil {
200226
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
201227
// TODO(why): what do we do now?
@@ -215,15 +241,6 @@ func (mq *MessageQueue) attemptSendAndRecovery(ctx context.Context, wlm bsmsg.Bi
215241
return false
216242
}
217243

218-
func (mq *MessageQueue) extractOutgoingMessage() bsmsg.BitSwapMessage {
219-
// grab outgoing message
220-
mq.outlk.Lock()
221-
wlm := mq.out
222-
mq.out = nil
223-
mq.outlk.Unlock()
224-
return wlm
225-
}
226-
227244
func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
228245
// allow ten minutes for connections this includes looking them up in the
229246
// dht dialing them, and handshaking

‎messagequeue/messagequeue_test.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (bsnet
2727
return fmn.messageSender, nil
2828
}
2929
return nil, fmn.messageSenderError
30-
3130
}
3231

3332
type fakeMessageSender struct {
@@ -77,12 +76,12 @@ func TestStartupAndShutdown(t *testing.T) {
7776
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
7877
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
7978
peerID := testutil.GeneratePeers(1)[0]
80-
messageQueue := New(peerID, fakenet)
79+
messageQueue := New(ctx, peerID, fakenet)
8180
ses := testutil.GenerateSessionID()
8281
wl := testutil.GenerateWantlist(10, ses)
8382

84-
messageQueue.Startup(ctx)
85-
messageQueue.AddWantlist(wl.Entries())
83+
messageQueue.Startup()
84+
messageQueue.AddWantlist(wl)
8685
messages := collectMessages(ctx, t, messagesSent, 10*time.Millisecond)
8786
if len(messages) != 1 {
8887
t.Fatal("wrong number of messages were sent for initial wants")
@@ -119,11 +118,11 @@ func TestSendingMessagesDeduped(t *testing.T) {
119118
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
120119
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
121120
peerID := testutil.GeneratePeers(1)[0]
122-
messageQueue := New(peerID, fakenet)
121+
messageQueue := New(ctx, peerID, fakenet)
123122
ses1 := testutil.GenerateSessionID()
124123
ses2 := testutil.GenerateSessionID()
125124
entries := testutil.GenerateMessageEntries(10, false)
126-
messageQueue.Startup(ctx)
125+
messageQueue.Startup()
127126

128127
messageQueue.AddMessage(entries, ses1)
129128
messageQueue.AddMessage(entries, ses2)
@@ -142,13 +141,13 @@ func TestSendingMessagesPartialDupe(t *testing.T) {
142141
fakeSender := &fakeMessageSender{nil, fullClosedChan, resetChan, messagesSent}
143142
fakenet := &fakeMessageNetwork{nil, nil, fakeSender}
144143
peerID := testutil.GeneratePeers(1)[0]
145-
messageQueue := New(peerID, fakenet)
144+
messageQueue := New(ctx, peerID, fakenet)
146145
ses1 := testutil.GenerateSessionID()
147146
ses2 := testutil.GenerateSessionID()
148147
entries := testutil.GenerateMessageEntries(10, false)
149148
moreEntries := testutil.GenerateMessageEntries(5, false)
150149
secondEntries := append(entries[5:], moreEntries...)
151-
messageQueue.Startup(ctx)
150+
messageQueue.Startup()
152151

153152
messageQueue.AddMessage(entries, ses1)
154153
messageQueue.AddMessage(secondEntries, ses2)

‎peermanager/peermanager.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ var (
2020
// PeerQueue provides a queer of messages to be sent for a single peer.
2121
type PeerQueue interface {
2222
AddMessage(entries []*bsmsg.Entry, ses uint64)
23-
Startup(ctx context.Context)
24-
AddWantlist(initialEntries []*wantlist.Entry)
23+
Startup()
24+
AddWantlist(initialWants *wantlist.SessionTrackedWantlist)
2525
Shutdown()
2626
}
2727

2828
// PeerQueueFactory provides a function that will create a PeerQueue.
29-
type PeerQueueFactory func(p peer.ID) PeerQueue
29+
type PeerQueueFactory func(ctx context.Context, p peer.ID) PeerQueue
3030

3131
type peerMessage interface {
3232
handle(pm *PeerManager)
@@ -69,13 +69,13 @@ func (pm *PeerManager) ConnectedPeers() []peer.ID {
6969

7070
// Connected is called to add a new peer to the pool, and send it an initial set
7171
// of wants.
72-
func (pm *PeerManager) Connected(p peer.ID, initialEntries []*wantlist.Entry) {
72+
func (pm *PeerManager) Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist) {
7373
pm.peerQueuesLk.Lock()
7474

7575
pq := pm.getOrCreate(p)
7676

7777
if pq.refcnt == 0 {
78-
pq.pq.AddWantlist(initialEntries)
78+
pq.pq.AddWantlist(initialWants)
7979
}
8080

8181
pq.refcnt++
@@ -128,8 +128,8 @@ func (pm *PeerManager) SendMessage(entries []*bsmsg.Entry, targets []peer.ID, fr
128128
func (pm *PeerManager) getOrCreate(p peer.ID) *peerQueueInstance {
129129
pqi, ok := pm.peerQueues[p]
130130
if !ok {
131-
pq := pm.createPeerQueue(p)
132-
pq.Startup(pm.ctx)
131+
pq := pm.createPeerQueue(pm.ctx, p)
132+
pq.Startup()
133133
pqi = &peerQueueInstance{0, pq}
134134
pm.peerQueues[p] = pqi
135135
}

‎peermanager/peermanager_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ type fakePeer struct {
2424
messagesSent chan messageSent
2525
}
2626

27-
func (fp *fakePeer) Startup(ctx context.Context) {}
28-
func (fp *fakePeer) Shutdown() {}
27+
func (fp *fakePeer) Startup() {}
28+
func (fp *fakePeer) Shutdown() {}
2929

3030
func (fp *fakePeer) AddMessage(entries []*bsmsg.Entry, ses uint64) {
3131
fp.messagesSent <- messageSent{fp.p, entries, ses}
3232
}
33-
func (fp *fakePeer) AddWantlist(initialEntries []*wantlist.Entry) {}
33+
func (fp *fakePeer) AddWantlist(initialWants *wantlist.SessionTrackedWantlist) {}
3434
func makePeerQueueFactory(messagesSent chan messageSent) PeerQueueFactory {
35-
return func(p peer.ID) PeerQueue {
35+
return func(ctx context.Context, p peer.ID) PeerQueue {
3636
return &fakePeer{
3737
p: p,
3838
messagesSent: messagesSent,

‎testutil/testutil.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ func GenerateCids(n int) []cid.Cid {
3939
}
4040

4141
// GenerateWantlist makes a populated wantlist.
42-
func GenerateWantlist(n int, ses uint64) *wantlist.ThreadSafe {
43-
wl := wantlist.NewThreadSafe()
42+
func GenerateWantlist(n int, ses uint64) *wantlist.SessionTrackedWantlist {
43+
wl := wantlist.NewSessionTrackedWantlist()
4444
for i := 0; i < n; i++ {
4545
prioritySeq++
4646
entry := wantlist.NewRefEntry(blockGenerator.Next().Cid(), prioritySeq)

‎wantlist/wantlist.go

+42-44
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,36 @@
1-
// package wantlist implements an object for bitswap that contains the keys
1+
// Package wantlist implements an object for bitswap that contains the keys
22
// that a given peer wants.
33
package wantlist
44

55
import (
66
"sort"
7-
"sync"
87

98
cid "github.com/ipfs/go-cid"
109
)
1110

12-
type ThreadSafe struct {
13-
lk sync.RWMutex
14-
set map[cid.Cid]*Entry
11+
type SessionTrackedWantlist struct {
12+
set map[cid.Cid]*sessionTrackedEntry
1513
}
1614

17-
// not threadsafe
1815
type Wantlist struct {
1916
set map[cid.Cid]*Entry
2017
}
2118

2219
type Entry struct {
2320
Cid cid.Cid
2421
Priority int
22+
}
2523

26-
SesTrk map[uint64]struct{}
27-
// Trash in a book-keeping field
28-
Trash bool
24+
type sessionTrackedEntry struct {
25+
*Entry
26+
sesTrk map[uint64]struct{}
2927
}
3028

3129
// NewRefEntry creates a new reference tracked wantlist entry.
3230
func NewRefEntry(c cid.Cid, p int) *Entry {
3331
return &Entry{
3432
Cid: c,
3533
Priority: p,
36-
SesTrk: make(map[uint64]struct{}),
3734
}
3835
}
3936

@@ -43,9 +40,9 @@ func (es entrySlice) Len() int { return len(es) }
4340
func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
4441
func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priority }
4542

46-
func NewThreadSafe() *ThreadSafe {
47-
return &ThreadSafe{
48-
set: make(map[cid.Cid]*Entry),
43+
func NewSessionTrackedWantlist() *SessionTrackedWantlist {
44+
return &SessionTrackedWantlist{
45+
set: make(map[cid.Cid]*sessionTrackedEntry),
4946
}
5047
}
5148

@@ -63,50 +60,46 @@ func New() *Wantlist {
6360
// TODO: think through priority changes here
6461
// Add returns true if the cid did not exist in the wantlist before this call
6562
// (even if it was under a different session).
66-
func (w *ThreadSafe) Add(c cid.Cid, priority int, ses uint64) bool {
67-
w.lk.Lock()
68-
defer w.lk.Unlock()
63+
func (w *SessionTrackedWantlist) Add(c cid.Cid, priority int, ses uint64) bool {
64+
6965
if e, ok := w.set[c]; ok {
70-
e.SesTrk[ses] = struct{}{}
66+
e.sesTrk[ses] = struct{}{}
7167
return false
7268
}
7369

74-
w.set[c] = &Entry{
75-
Cid: c,
76-
Priority: priority,
77-
SesTrk: map[uint64]struct{}{ses: struct{}{}},
70+
w.set[c] = &sessionTrackedEntry{
71+
Entry: &Entry{Cid: c, Priority: priority},
72+
sesTrk: map[uint64]struct{}{ses: struct{}{}},
7873
}
7974

8075
return true
8176
}
8277

8378
// AddEntry adds given Entry to the wantlist. For more information see Add method.
84-
func (w *ThreadSafe) AddEntry(e *Entry, ses uint64) bool {
85-
w.lk.Lock()
86-
defer w.lk.Unlock()
79+
func (w *SessionTrackedWantlist) AddEntry(e *Entry, ses uint64) bool {
8780
if ex, ok := w.set[e.Cid]; ok {
88-
ex.SesTrk[ses] = struct{}{}
81+
ex.sesTrk[ses] = struct{}{}
8982
return false
9083
}
91-
w.set[e.Cid] = e
92-
e.SesTrk[ses] = struct{}{}
84+
w.set[e.Cid] = &sessionTrackedEntry{
85+
Entry: e,
86+
sesTrk: map[uint64]struct{}{ses: struct{}{}},
87+
}
9388
return true
9489
}
9590

9691
// Remove removes the given cid from being tracked by the given session.
9792
// 'true' is returned if this call to Remove removed the final session ID
9893
// tracking the cid. (meaning true will be returned iff this call caused the
9994
// value of 'Contains(c)' to change from true to false)
100-
func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool {
101-
w.lk.Lock()
102-
defer w.lk.Unlock()
95+
func (w *SessionTrackedWantlist) Remove(c cid.Cid, ses uint64) bool {
10396
e, ok := w.set[c]
10497
if !ok {
10598
return false
10699
}
107100

108-
delete(e.SesTrk, ses)
109-
if len(e.SesTrk) == 0 {
101+
delete(e.sesTrk, ses)
102+
if len(e.sesTrk) == 0 {
110103
delete(w.set, c)
111104
return true
112105
}
@@ -115,35 +108,40 @@ func (w *ThreadSafe) Remove(c cid.Cid, ses uint64) bool {
115108

116109
// Contains returns true if the given cid is in the wantlist tracked by one or
117110
// more sessions.
118-
func (w *ThreadSafe) Contains(k cid.Cid) (*Entry, bool) {
119-
w.lk.RLock()
120-
defer w.lk.RUnlock()
111+
func (w *SessionTrackedWantlist) Contains(k cid.Cid) (*Entry, bool) {
121112
e, ok := w.set[k]
122-
return e, ok
113+
if !ok {
114+
return nil, false
115+
}
116+
return e.Entry, true
123117
}
124118

125-
func (w *ThreadSafe) Entries() []*Entry {
126-
w.lk.RLock()
127-
defer w.lk.RUnlock()
119+
func (w *SessionTrackedWantlist) Entries() []*Entry {
128120
es := make([]*Entry, 0, len(w.set))
129121
for _, e := range w.set {
130-
es = append(es, e)
122+
es = append(es, e.Entry)
131123
}
132124
return es
133125
}
134126

135-
func (w *ThreadSafe) SortedEntries() []*Entry {
127+
func (w *SessionTrackedWantlist) SortedEntries() []*Entry {
136128
es := w.Entries()
137129
sort.Sort(entrySlice(es))
138130
return es
139131
}
140132

141-
func (w *ThreadSafe) Len() int {
142-
w.lk.RLock()
143-
defer w.lk.RUnlock()
133+
func (w *SessionTrackedWantlist) Len() int {
144134
return len(w.set)
145135
}
146136

137+
func (w *SessionTrackedWantlist) CopyWants(to *SessionTrackedWantlist) {
138+
for _, e := range w.set {
139+
for k := range e.sesTrk {
140+
to.AddEntry(e.Entry, k)
141+
}
142+
}
143+
}
144+
147145
func (w *Wantlist) Len() int {
148146
return len(w.set)
149147
}

‎wantlist/wantlist_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ func TestBasicWantlist(t *testing.T) {
8282
}
8383
}
8484

85-
func TestSesRefWantlist(t *testing.T) {
86-
wl := NewThreadSafe()
85+
func TestSessionTrackedWantlist(t *testing.T) {
86+
wl := NewSessionTrackedWantlist()
8787

8888
if !wl.Add(testcids[0], 5, 1) {
8989
t.Fatal("should have added")

‎wantmanager/wantmanager.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ const (
2424
// managed by the WantManager.
2525
type PeerHandler interface {
2626
Disconnected(p peer.ID)
27-
Connected(p peer.ID, initialEntries []*wantlist.Entry)
27+
Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist)
2828
SendMessage(entries []*bsmsg.Entry, targets []peer.ID, from uint64)
2929
}
3030

@@ -42,8 +42,8 @@ type WantManager struct {
4242
wantMessages chan wantMessage
4343

4444
// synchronized by Run loop, only touch inside there
45-
wl *wantlist.ThreadSafe
46-
bcwl *wantlist.ThreadSafe
45+
wl *wantlist.SessionTrackedWantlist
46+
bcwl *wantlist.SessionTrackedWantlist
4747

4848
ctx context.Context
4949
cancel func()
@@ -59,8 +59,8 @@ func New(ctx context.Context) *WantManager {
5959
"Number of items in wantlist.").Gauge()
6060
return &WantManager{
6161
wantMessages: make(chan wantMessage, 10),
62-
wl: wantlist.NewThreadSafe(),
63-
bcwl: wantlist.NewThreadSafe(),
62+
wl: wantlist.NewSessionTrackedWantlist(),
63+
bcwl: wantlist.NewSessionTrackedWantlist(),
6464
ctx: ctx,
6565
cancel: cancel,
6666
wantlistGauge: wantlistGauge,
@@ -274,7 +274,7 @@ type connectedMessage struct {
274274
}
275275

276276
func (cm *connectedMessage) handle(wm *WantManager) {
277-
wm.peerHandler.Connected(cm.p, wm.bcwl.Entries())
277+
wm.peerHandler.Connected(cm.p, wm.bcwl)
278278
}
279279

280280
type disconnectedMessage struct {

‎wantmanager/wantmanager_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ func (fph *fakePeerHandler) SendMessage(entries []*bsmsg.Entry, targets []peer.I
2525
fph.lk.Unlock()
2626
}
2727

28-
func (fph *fakePeerHandler) Connected(p peer.ID, initialEntries []*wantlist.Entry) {}
29-
func (fph *fakePeerHandler) Disconnected(p peer.ID) {}
28+
func (fph *fakePeerHandler) Connected(p peer.ID, initialWants *wantlist.SessionTrackedWantlist) {}
29+
func (fph *fakePeerHandler) Disconnected(p peer.ID) {}
3030

3131
func (fph *fakePeerHandler) getLastWantSet() wantSet {
3232
fph.lk.Lock()

0 commit comments

Comments
 (0)
This repository has been archived.