Skip to content

Commit 9e65f29

Browse files
authored
Merge pull request #232 from libp2p/revert-225-tidy-up-bootstrapping
Revert "Tidy up bootstrapping"
2 parents 66cc80c + 201eea5 commit 9e65f29

File tree

4 files changed

+121
-76
lines changed

4 files changed

+121
-76
lines changed

dht.go

-10
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,6 @@ type IpfsDHT struct {
6464
protocols []protocol.ID // DHT protocols
6565
}
6666

67-
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
68-
// guarantee, but we can use them to aid refactoring.
69-
var (
70-
_ routing.ContentRouting = (*IpfsDHT)(nil)
71-
_ routing.IpfsRouting = (*IpfsDHT)(nil)
72-
_ routing.PeerRouting = (*IpfsDHT)(nil)
73-
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
74-
_ routing.ValueStore = (*IpfsDHT)(nil)
75-
)
76-
7767
// New creates a new DHT with the specified host and options.
7868
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
7969
var cfg opts.Options

dht_bootstrap.go

+104-62
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ import (
77
"time"
88

99
u "github.com/ipfs/go-ipfs-util"
10+
goprocess "github.com/jbenet/goprocess"
11+
periodicproc "github.com/jbenet/goprocess/periodic"
1012
peer "github.com/libp2p/go-libp2p-peer"
11-
pstore "github.com/libp2p/go-libp2p-peerstore"
1213
routing "github.com/libp2p/go-libp2p-routing"
1314
)
1415

@@ -38,73 +39,87 @@ var DefaultBootstrapConfig = BootstrapConfig{
3839
Timeout: time.Duration(10 * time.Second),
3940
}
4041

41-
// A method in the IpfsRouting interface. It calls BootstrapWithConfig with
42-
// the default bootstrap config.
42+
// Bootstrap ensures the dht routing table remains healthy as peers come and go.
43+
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
44+
// process will run a number of queries each time, and run every time signal fires.
45+
// These parameters are configurable.
46+
//
47+
// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface
4348
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
44-
return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig)
49+
proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig)
50+
if err != nil {
51+
return err
52+
}
53+
54+
// wait till ctx or dht.Context exits.
55+
// we have to do it this way to satisfy the Routing interface (contexts)
56+
go func() {
57+
defer proc.Close()
58+
select {
59+
case <-ctx.Done():
60+
case <-dht.Context().Done():
61+
}
62+
}()
63+
64+
return nil
4565
}
4666

47-
// Runs cfg.Queries bootstrap queries every cfg.Period.
48-
func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error {
49-
// Because this method is not synchronous, we have to duplicate sanity
50-
// checks on the config so that callers aren't oblivious.
67+
// BootstrapWithConfig ensures the dht routing table remains healthy as peers come and go.
68+
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
69+
// process will run a number of queries each time, and run every time signal fires.
70+
// These parameters are configurable.
71+
//
72+
// BootstrapWithConfig returns a process, so the user can stop it.
73+
func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) {
5174
if cfg.Queries <= 0 {
52-
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
75+
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
5376
}
54-
go func() {
77+
78+
proc := dht.Process().Go(func(p goprocess.Process) {
79+
<-p.Go(dht.bootstrapWorker(cfg)).Closed()
5580
for {
56-
err := dht.runBootstrap(ctx, cfg)
57-
if err != nil {
58-
log.Warningf("error bootstrapping: %s", err)
59-
}
6081
select {
6182
case <-time.After(cfg.Period):
62-
case <-ctx.Done():
83+
<-p.Go(dht.bootstrapWorker(cfg)).Closed()
84+
case <-p.Closing():
6385
return
6486
}
6587
}
66-
}()
67-
return nil
88+
})
89+
90+
return proc, nil
6891
}
6992

70-
// This is a synchronous bootstrap. cfg.Queries queries will run each with a
71-
// timeout of cfg.Timeout. cfg.Period is not used.
72-
func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
93+
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
94+
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
95+
// process will run a number of queries each time, and run every time signal fires.
96+
// These parameters are configurable.
97+
//
98+
// SignalBootstrap returns a process, so the user can stop it.
99+
func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
73100
if cfg.Queries <= 0 {
74-
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
101+
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
75102
}
76-
return dht.runBootstrap(ctx, cfg)
77-
}
78103

79-
func newRandomPeerId() peer.ID {
80-
id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs.
81-
rand.Read(id)
82-
id = u.Hash(id) // TODO: Feed this directly into the multihash instead of hashing it.
83-
return peer.ID(id)
84-
}
104+
if signal == nil {
105+
return nil, fmt.Errorf("invalid signal: %v", signal)
106+
}
85107

86-
// Traverse the DHT toward the given ID.
87-
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (pstore.PeerInfo, error) {
88-
// TODO: Extract the query action (traversal logic?) inside FindPeer,
89-
// don't actually call through the FindPeer machinery, which can return
90-
// things out of the peer store etc.
91-
return dht.FindPeer(ctx, target)
108+
proc := periodicproc.Ticker(signal, dht.bootstrapWorker(cfg))
109+
110+
return proc, nil
92111
}
93112

94-
// Traverse the DHT toward a random ID.
95-
func (dht *IpfsDHT) randomWalk(ctx context.Context) error {
96-
id := newRandomPeerId()
97-
p, err := dht.walk(ctx, id)
98-
switch err {
99-
case routing.ErrNotFound:
100-
return nil
101-
case nil:
102-
// We found a peer from a randomly generated ID. This should be very
103-
// unlikely.
104-
log.Warningf("random walk toward %s actually found peer: %s", id, p)
105-
return nil
106-
default:
107-
return err
113+
func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) {
114+
return func(worker goprocess.Process) {
115+
// it would be useful to be able to send out signals of when we bootstrap, too...
116+
// maybe this is a good case for whole module event pub/sub?
117+
118+
ctx := dht.Context()
119+
if err := dht.runBootstrap(ctx, cfg); err != nil {
120+
log.Warning(err)
121+
// A bootstrapping error is important to notice but not fatal.
122+
}
108123
}
109124
}
110125

@@ -117,24 +132,51 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
117132
defer bslog("end")
118133
defer log.EventBegin(ctx, "dhtRunBootstrap").Done()
119134

120-
doQuery := func(n int, target string, f func(context.Context) error) error {
121-
log.Debugf("Bootstrapping query (%d/%d) to %s", n, cfg.Queries, target)
135+
var merr u.MultiErr
136+
137+
randomID := func() peer.ID {
138+
// 16 random bytes is not a valid peer id. it may be fine becuase
139+
// the dht will rehash to its own keyspace anyway.
140+
id := make([]byte, 16)
141+
rand.Read(id)
142+
id = u.Hash(id)
143+
return peer.ID(id)
144+
}
145+
146+
// bootstrap sequentially, as results will compound
147+
runQuery := func(ctx context.Context, id peer.ID) {
122148
ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
123149
defer cancel()
124-
return f(ctx)
125-
}
126150

127-
// Do all but one of the bootstrap queries as random walks.
128-
for i := 1; i < cfg.Queries; i++ {
129-
err := doQuery(i, "random ID", dht.randomWalk)
130-
if err != nil {
131-
return err
151+
p, err := dht.FindPeer(ctx, id)
152+
if err == routing.ErrNotFound {
153+
// this isn't an error. this is precisely what we expect.
154+
} else if err != nil {
155+
merr = append(merr, err)
156+
} else {
157+
// woah, actually found a peer with that ID? this shouldn't happen normally
158+
// (as the ID we use is not a real ID). this is an odd error worth logging.
159+
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
160+
log.Warningf("%s", err)
161+
merr = append(merr, err)
132162
}
133163
}
134164

165+
// these should be parallel normally. but can make them sequential for debugging.
166+
// note that the core/bootstrap context deadline should be extended too for that.
167+
for i := 0; i < cfg.Queries; i++ {
168+
id := randomID()
169+
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
170+
runQuery(ctx, id)
171+
}
172+
135173
// Find self to distribute peer info to our neighbors.
136-
return doQuery(cfg.Queries, fmt.Sprintf("self: %s", dht.self), func(ctx context.Context) error {
137-
_, err := dht.walk(ctx, dht.self)
138-
return err
139-
})
174+
// Do this after bootstrapping.
175+
log.Debugf("Bootstrapping query to self: %s", dht.self)
176+
runQuery(ctx, dht.self)
177+
178+
if len(merr) > 0 {
179+
return merr
180+
}
181+
return nil
140182
}

dht_test.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -679,10 +679,23 @@ func TestPeriodicBootstrap(t *testing.T) {
679679
}
680680
}()
681681

682+
signals := []chan time.Time{}
683+
682684
var cfg BootstrapConfig
683685
cfg = DefaultBootstrapConfig
684686
cfg.Queries = 5
685687

688+
// kick off periodic bootstrappers with instrumented signals.
689+
for _, dht := range dhts {
690+
s := make(chan time.Time)
691+
signals = append(signals, s)
692+
proc, err := dht.BootstrapOnSignal(cfg, s)
693+
if err != nil {
694+
t.Fatal(err)
695+
}
696+
defer proc.Close()
697+
}
698+
686699
t.Logf("dhts are not connected. %d", nDHTs)
687700
for _, dht := range dhts {
688701
rtlen := dht.routingTable.Size()
@@ -708,8 +721,9 @@ func TestPeriodicBootstrap(t *testing.T) {
708721
}
709722

710723
t.Logf("bootstrapping them so they find each other. %d", nDHTs)
711-
for _, dht := range dhts {
712-
go dht.BootstrapOnce(ctx, cfg)
724+
now := time.Now()
725+
for _, signal := range signals {
726+
go func(s chan time.Time) { s <- now }(signal)
713727
}
714728

715729
// this is async, and we dont know when it's finished with one cycle, so keep checking

routing.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
routing "github.com/libp2p/go-libp2p-routing"
2222
notif "github.com/libp2p/go-libp2p-routing/notifications"
2323
ropts "github.com/libp2p/go-libp2p-routing/options"
24-
"github.com/pkg/errors"
2524
)
2625

2726
// asyncQueryBuffer is the size of buffered channels in async queries. This
@@ -584,7 +583,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo
584583

585584
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
586585
if len(peers) == 0 {
587-
return pstore.PeerInfo{}, errors.WithStack(kb.ErrLookupFailure)
586+
return pstore.PeerInfo{}, kb.ErrLookupFailure
588587
}
589588

590589
// Sanity...

0 commit comments

Comments
 (0)