Skip to content

Commit db919d1

Browse files
authored
Merge pull request ipfs/go-bitswap#25 from ipfs/feat/dup-blocks-test-enhancement
feat(Benchmarks): Add real world dup blocks test This commit was moved from ipfs/go-bitswap@edf2496
2 parents d72f163 + cec6f76 commit db919d1

4 files changed

+223
-28
lines changed

bitswap/dup_blocks_test.go

+52-21
Original file line numberDiff line numberDiff line change
@@ -33,71 +33,102 @@ type runStats struct {
3333
var benchmarkLog []runStats
3434

3535
func BenchmarkDups2Nodes(b *testing.B) {
36+
fixedDelay := delay.Fixed(10 * time.Millisecond)
3637
b.Run("AllToAll-OneAtATime", func(b *testing.B) {
37-
subtestDistributeAndFetch(b, 3, 100, allToAll, oneAtATime)
38+
subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, oneAtATime)
3839
})
3940
b.Run("AllToAll-BigBatch", func(b *testing.B) {
40-
subtestDistributeAndFetch(b, 3, 100, allToAll, batchFetchAll)
41+
subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, batchFetchAll)
4142
})
4243

4344
b.Run("Overlap1-OneAtATime", func(b *testing.B) {
44-
subtestDistributeAndFetch(b, 3, 100, overlap1, oneAtATime)
45+
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap1, oneAtATime)
4546
})
4647

4748
b.Run("Overlap2-BatchBy10", func(b *testing.B) {
48-
subtestDistributeAndFetch(b, 3, 100, overlap2, batchFetchBy10)
49+
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap2, batchFetchBy10)
4950
})
5051

5152
b.Run("Overlap3-OneAtATime", func(b *testing.B) {
52-
subtestDistributeAndFetch(b, 3, 100, overlap3, oneAtATime)
53+
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, oneAtATime)
5354
})
5455
b.Run("Overlap3-BatchBy10", func(b *testing.B) {
55-
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchBy10)
56+
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchBy10)
5657
})
5758
b.Run("Overlap3-AllConcurrent", func(b *testing.B) {
58-
subtestDistributeAndFetch(b, 3, 100, overlap3, fetchAllConcurrent)
59+
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, fetchAllConcurrent)
5960
})
6061
b.Run("Overlap3-BigBatch", func(b *testing.B) {
61-
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchAll)
62+
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchAll)
6263
})
6364
b.Run("Overlap3-UnixfsFetch", func(b *testing.B) {
64-
subtestDistributeAndFetch(b, 3, 100, overlap3, unixfsFileFetch)
65+
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, unixfsFileFetch)
6566
})
6667
b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) {
67-
subtestDistributeAndFetch(b, 10, 100, allToAll, oneAtATime)
68+
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, oneAtATime)
6869
})
6970
b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) {
70-
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchBy10)
71+
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchBy10)
7172
})
7273
b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) {
73-
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchAll)
74+
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchAll)
7475
})
7576
b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) {
76-
subtestDistributeAndFetch(b, 10, 100, allToAll, fetchAllConcurrent)
77+
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, fetchAllConcurrent)
7778
})
7879
b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) {
79-
subtestDistributeAndFetch(b, 10, 100, allToAll, unixfsFileFetch)
80+
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, unixfsFileFetch)
8081
})
8182
b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) {
82-
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, oneAtATime)
83+
subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, oneAtATime)
8384
})
8485
b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) {
85-
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, batchFetchAll)
86+
subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, batchFetchAll)
8687
})
8788
b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) {
88-
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, unixfsFileFetch)
89+
subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, unixfsFileFetch)
8990
})
9091
b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) {
91-
subtestDistributeAndFetch(b, 200, 20, allToAll, batchFetchAll)
92+
subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll)
9293
})
93-
9494
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
9595
ioutil.WriteFile("benchmark.json", out, 0666)
9696
}
9797

98-
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, df distFunc, ff fetchFunc) {
98+
const fastSpeed = 60 * time.Millisecond
99+
const mediumSpeed = 200 * time.Millisecond
100+
const slowSpeed = 800 * time.Millisecond
101+
const superSlowSpeed = 4000 * time.Millisecond
102+
const distribution = 20 * time.Millisecond
103+
104+
func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
105+
fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
106+
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
107+
0.0, 0.0, distribution, nil)
108+
fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
109+
averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
110+
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
111+
0.3, 0.3, distribution, nil)
112+
averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
113+
slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
114+
mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
115+
0.3, 0.3, distribution, nil)
116+
slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
117+
118+
b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
119+
subtestDistributeAndFetch(b, 300, 200, fastNetworkDelay, allToAll, batchFetchAll)
120+
})
121+
b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
122+
subtestDistributeAndFetch(b, 300, 200, averageNetworkDelay, allToAll, batchFetchAll)
123+
})
124+
b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
125+
subtestDistributeAndFetch(b, 300, 200, slowNetworkDelay, allToAll, batchFetchAll)
126+
})
127+
}
128+
129+
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
99130
start := time.Now()
100-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond))
131+
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
101132
sg := NewTestSessionGenerator(net)
102133
defer sg.Close()
103134

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package bitswap
2+
3+
import (
4+
"math/rand"
5+
"time"
6+
7+
"github.com/ipfs/go-ipfs-delay"
8+
)
9+
10+
var sharedRNG = rand.New(rand.NewSource(time.Now().UnixNano()))
11+
12+
// InternetLatencyDelayGenerator generates three clusters of delays,
13+
// typical of the type of peers you would encounter on the interenet
14+
// Given a base delay time T, the wait time generated will be either:
15+
// 1. A normalized distribution around the base time
16+
// 2. A normalized distribution around the base time plus a "medium" delay
17+
// 3. A normalized distribution around the base time plus a "large" delay
18+
// The size of the medium & large delays are determined when the generator
19+
// is constructed, as well as the relative percentages with which delays fall
20+
// into each of the three different clusters, and the standard deviation for
21+
// the normalized distribution
22+
// This can be used to generate a number of scenarios typical of latency
23+
// distribution among peers on the internet
24+
func InternetLatencyDelayGenerator(
25+
mediumDelay time.Duration,
26+
largeDelay time.Duration,
27+
percentMedium float64,
28+
percentLarge float64,
29+
std time.Duration,
30+
rng *rand.Rand) delay.Generator {
31+
if rng == nil {
32+
rng = sharedRNG
33+
}
34+
35+
return &internetLatencyDelayGenerator{
36+
mediumDelay: mediumDelay,
37+
largeDelay: largeDelay,
38+
percentLarge: percentLarge,
39+
percentMedium: percentMedium,
40+
std: std,
41+
rng: rng,
42+
}
43+
}
44+
45+
type internetLatencyDelayGenerator struct {
46+
mediumDelay time.Duration
47+
largeDelay time.Duration
48+
percentLarge float64
49+
percentMedium float64
50+
std time.Duration
51+
rng *rand.Rand
52+
}
53+
54+
func (d *internetLatencyDelayGenerator) NextWaitTime(t time.Duration) time.Duration {
55+
clusterDistribution := d.rng.Float64()
56+
baseDelay := time.Duration(d.rng.NormFloat64()*float64(d.std)) + t
57+
if clusterDistribution < d.percentLarge {
58+
return baseDelay + d.largeDelay
59+
} else if clusterDistribution < d.percentMedium+d.percentLarge {
60+
return baseDelay + d.mediumDelay
61+
}
62+
return baseDelay
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package bitswap
2+
3+
import (
4+
"math"
5+
"math/rand"
6+
"testing"
7+
"time"
8+
)
9+
10+
const testSeed = 99
11+
12+
func TestInternetLatencyDelayNextWaitTimeDistribution(t *testing.T) {
13+
initialValue := 1000 * time.Millisecond
14+
deviation := 100 * time.Millisecond
15+
mediumDelay := 1000 * time.Millisecond
16+
largeDelay := 3000 * time.Millisecond
17+
percentMedium := 0.2
18+
percentLarge := 0.4
19+
buckets := make(map[string]int)
20+
internetLatencyDistributionDelay := InternetLatencyDelayGenerator(
21+
mediumDelay,
22+
largeDelay,
23+
percentMedium,
24+
percentLarge,
25+
deviation,
26+
rand.New(rand.NewSource(testSeed)))
27+
28+
buckets["fast"] = 0
29+
buckets["medium"] = 0
30+
buckets["slow"] = 0
31+
buckets["outside_1_deviation"] = 0
32+
33+
// strategy here is rather than mock randomness, just use enough samples to
34+
// get approximately the distribution you'd expect
35+
for i := 0; i < 10000; i++ {
36+
next := internetLatencyDistributionDelay.NextWaitTime(initialValue)
37+
if math.Abs((next - initialValue).Seconds()) <= deviation.Seconds() {
38+
buckets["fast"]++
39+
} else if math.Abs((next - initialValue - mediumDelay).Seconds()) <= deviation.Seconds() {
40+
buckets["medium"]++
41+
} else if math.Abs((next - initialValue - largeDelay).Seconds()) <= deviation.Seconds() {
42+
buckets["slow"]++
43+
} else {
44+
buckets["outside_1_deviation"]++
45+
}
46+
}
47+
totalInOneDeviation := float64(10000 - buckets["outside_1_deviation"])
48+
oneDeviationPercentage := totalInOneDeviation / 10000
49+
fastPercentageResult := float64(buckets["fast"]) / totalInOneDeviation
50+
mediumPercentageResult := float64(buckets["medium"]) / totalInOneDeviation
51+
slowPercentageResult := float64(buckets["slow"]) / totalInOneDeviation
52+
53+
// see 68-95-99 rule for normal distributions
54+
if math.Abs(oneDeviationPercentage-0.6827) >= 0.1 {
55+
t.Fatal("Failed to distribute values normally based on standard deviation")
56+
}
57+
58+
if math.Abs(fastPercentageResult+percentMedium+percentLarge-1) >= 0.1 {
59+
t.Fatal("Incorrect percentage of values distributed around fast delay time")
60+
}
61+
62+
if math.Abs(mediumPercentageResult-percentMedium) >= 0.1 {
63+
t.Fatal("Incorrect percentage of values distributed around medium delay time")
64+
}
65+
66+
if math.Abs(slowPercentageResult-percentLarge) >= 0.1 {
67+
t.Fatal("Incorrect percentage of values distributed around slow delay time")
68+
}
69+
}

bitswap/testnet/virtual.go

+39-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package bitswap
33
import (
44
"context"
55
"errors"
6+
"sort"
67
"sync"
78
"sync/atomic"
89
"time"
@@ -24,6 +25,7 @@ var log = logging.Logger("bstestnet")
2425

2526
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
2627
return &network{
28+
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
2729
clients: make(map[peer.ID]*receiverQueue),
2830
delay: d,
2931
routingserver: rs,
@@ -33,6 +35,7 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
3335

3436
type network struct {
3537
mu sync.Mutex
38+
latencies map[peer.ID]map[peer.ID]time.Duration
3639
clients map[peer.ID]*receiverQueue
3740
routingserver mockrouting.Server
3841
delay delay.D
@@ -87,6 +90,18 @@ func (n *network) SendMessage(
8790
n.mu.Lock()
8891
defer n.mu.Unlock()
8992

93+
latencies, ok := n.latencies[from]
94+
if !ok {
95+
latencies = make(map[peer.ID]time.Duration)
96+
n.latencies[from] = latencies
97+
}
98+
99+
latency, ok := latencies[to]
100+
if !ok {
101+
latency = n.delay.NextWaitTime()
102+
latencies[to] = latency
103+
}
104+
90105
receiver, ok := n.clients[to]
91106
if !ok {
92107
return errors.New("cannot locate peer on network")
@@ -98,7 +113,7 @@ func (n *network) SendMessage(
98113
msg := &message{
99114
from: from,
100115
msg: mes,
101-
shouldSend: time.Now().Add(n.delay.Get()),
116+
shouldSend: time.Now().Add(latency),
102117
}
103118
receiver.enqueue(msg)
104119

@@ -229,21 +244,38 @@ func (rq *receiverQueue) enqueue(m *message) {
229244
}
230245
}
231246

247+
func (rq *receiverQueue) Swap(i, j int) {
248+
rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i]
249+
}
250+
251+
func (rq *receiverQueue) Len() int {
252+
return len(rq.queue)
253+
}
254+
255+
func (rq *receiverQueue) Less(i, j int) bool {
256+
return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano()
257+
}
258+
232259
func (rq *receiverQueue) process() {
233260
for {
234261
rq.lk.Lock()
262+
sort.Sort(rq)
235263
if len(rq.queue) == 0 {
236264
rq.active = false
237265
rq.lk.Unlock()
238266
return
239267
}
240268
m := rq.queue[0]
241-
rq.queue = rq.queue[1:]
242-
rq.lk.Unlock()
243-
244-
time.Sleep(time.Until(m.shouldSend))
245-
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
246-
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
269+
if time.Until(m.shouldSend).Seconds() < 0.1 {
270+
rq.queue = rq.queue[1:]
271+
rq.lk.Unlock()
272+
time.Sleep(time.Until(m.shouldSend))
273+
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
274+
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
275+
} else {
276+
rq.lk.Unlock()
277+
time.Sleep(100 * time.Millisecond)
278+
}
247279
}
248280
}
249281

0 commit comments

Comments
 (0)