Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit fe0a253

Browse files
committed
test(Benchmarks): Add bandwidth restrictions
Limits connection bandwidth in real world benchmarks so that blocks are delayed if single peer is overused fix #40
1 parent 5c7498c commit fe0a253

File tree

5 files changed

+165
-16
lines changed

5 files changed

+165
-16
lines changed

dup_blocks_test.go benchmarks_test.go

+35-4
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ import (
99
"testing"
1010
"time"
1111

12-
tn "github.com/ipfs/go-bitswap/testnet"
12+
"github.com/ipfs/go-bitswap/testutil"
1313

1414
bssession "github.com/ipfs/go-bitswap/session"
15+
tn "github.com/ipfs/go-bitswap/testnet"
1516
"github.com/ipfs/go-block-format"
1617
cid "github.com/ipfs/go-cid"
1718
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
@@ -102,30 +103,40 @@ const mediumSpeed = 200 * time.Millisecond
102103
const slowSpeed = 800 * time.Millisecond
103104
const superSlowSpeed = 4000 * time.Millisecond
104105
const distribution = 20 * time.Millisecond
106+
const fastBandwidth = 1250000.0
107+
const fastBandwidthDeviation = 300000.0
108+
const mediumBandwidth = 500000.0
109+
const mediumBandwidthDeviation = 80000.0
110+
const slowBandwidth = 100000.0
111+
const slowBandwidthDeviation = 16500.0
112+
const stdBlockSize = 8000
105113

106114
func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
107115
benchmarkLog = nil
108116
fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
109117
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
110118
0.0, 0.0, distribution, nil)
111119
fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
120+
fastBandwidthGenerator := tn.VariableRateLimitGenerator(fastBandwidth, fastBandwidthDeviation, nil)
112121
averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
113122
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
114123
0.3, 0.3, distribution, nil)
115124
averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
125+
averageBandwidthGenerator := tn.VariableRateLimitGenerator(mediumBandwidth, mediumBandwidthDeviation, nil)
116126
slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
117127
mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
118128
0.3, 0.3, distribution, nil)
119129
slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)
130+
slowBandwidthGenerator := tn.VariableRateLimitGenerator(slowBandwidth, slowBandwidthDeviation, nil)
120131

121132
b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
122-
subtestDistributeAndFetch(b, 300, 200, fastNetworkDelay, allToAll, batchFetchAll)
133+
subtestDistributeAndFetchRateLimited(b, 300, 200, fastNetworkDelay, fastBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
123134
})
124135
b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
125-
subtestDistributeAndFetch(b, 300, 200, averageNetworkDelay, allToAll, batchFetchAll)
136+
subtestDistributeAndFetchRateLimited(b, 300, 200, averageNetworkDelay, averageBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
126137
})
127138
b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
128-
subtestDistributeAndFetch(b, 300, 200, slowNetworkDelay, allToAll, batchFetchAll)
139+
subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, allToAll, batchFetchAll)
129140
})
130141
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
131142
ioutil.WriteFile("tmp/rw-benchmark.json", out, 0666)
@@ -134,13 +145,33 @@ func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
134145
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
135146
start := time.Now()
136147
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
148+
137149
sg := NewTestSessionGenerator(net)
138150
defer sg.Close()
139151

140152
bg := blocksutil.NewBlockGenerator()
141153

142154
instances := sg.Instances(numnodes)
143155
blocks := bg.Blocks(numblks)
156+
runDistribution(b, instances, blocks, df, ff, start)
157+
}
158+
159+
func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, df distFunc, ff fetchFunc) {
160+
start := time.Now()
161+
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
162+
163+
sg := NewTestSessionGenerator(net)
164+
defer sg.Close()
165+
166+
instances := sg.Instances(numnodes)
167+
blocks := testutil.GenerateBlocksOfSize(numblks, blockSize)
168+
169+
runDistribution(b, instances, blocks, df, ff, start)
170+
}
171+
172+
func runDistribution(b *testing.B, instances []Instance, blocks []blocks.Block, df distFunc, ff fetchFunc, start time.Time) {
173+
174+
numnodes := len(instances)
144175

145176
fetcher := instances[numnodes-1]
146177

package.json

+6
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,12 @@
184184
"hash": "Qmf7HqcW7LtCi1W8y2bdx2eJpze74jkbKqpByxgXikdbLF",
185185
"name": "go-detect-race",
186186
"version": "1.0.1"
187+
},
188+
{
189+
"author": "jbenet",
190+
"hash": "QmSJ9n2s9NUoA9D849W5jj5SJ94nMcZpj1jCgQJieiNqSt",
191+
"name": "go-random",
192+
"version": "1.0.0"
187193
}
188194
],
189195
"gxVersion": "0.12.1",

testnet/rate_limit_generators.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package bitswap
2+
3+
import (
4+
"math/rand"
5+
)
6+
7+
type fixedRateLimitGenerator struct {
8+
rateLimit float64
9+
}
10+
11+
// FixedRateLimitGenerator returns a rate limit generatoe that always generates
12+
// the specified rate limit in bytes/sec.
13+
func FixedRateLimitGenerator(rateLimit float64) RateLimitGenerator {
14+
return &fixedRateLimitGenerator{rateLimit}
15+
}
16+
17+
func (rateLimitGenerator *fixedRateLimitGenerator) NextRateLimit() float64 {
18+
return rateLimitGenerator.rateLimit
19+
}
20+
21+
type variableRateLimitGenerator struct {
22+
rateLimit float64
23+
std float64
24+
rng *rand.Rand
25+
}
26+
27+
// VariableRateLimitGenerator makes rate limites that following a normal distribution.
28+
func VariableRateLimitGenerator(rateLimit float64, std float64, rng *rand.Rand) RateLimitGenerator {
29+
if rng == nil {
30+
rng = sharedRNG
31+
}
32+
33+
return &variableRateLimitGenerator{
34+
std: std,
35+
rng: rng,
36+
rateLimit: rateLimit,
37+
}
38+
}
39+
40+
func (rateLimitGenerator *variableRateLimitGenerator) NextRateLimit() float64 {
41+
return rateLimitGenerator.rng.NormFloat64()*rateLimitGenerator.std + rateLimitGenerator.rateLimit
42+
}

testnet/virtual.go

+59-12
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,55 @@ import (
1818
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
1919
peer "github.com/libp2p/go-libp2p-peer"
2020
routing "github.com/libp2p/go-libp2p-routing"
21+
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
2122
testutil "github.com/libp2p/go-testutil"
2223
)
2324

2425
var log = logging.Logger("bstestnet")
2526

2627
func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
2728
return &network{
28-
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
29-
clients: make(map[peer.ID]*receiverQueue),
30-
delay: d,
31-
routingserver: rs,
32-
conns: make(map[string]struct{}),
29+
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
30+
clients: make(map[peer.ID]*receiverQueue),
31+
delay: d,
32+
routingserver: rs,
33+
isRateLimited: false,
34+
rateLimitGenerator: nil,
35+
conns: make(map[string]struct{}),
36+
}
37+
}
38+
39+
type rateLimiter interface {
40+
Limit(dataSize int) time.Duration
41+
}
42+
43+
type RateLimitGenerator interface {
44+
NextRateLimit() float64
45+
}
46+
47+
func RateLimitedVirtualNetwork(rs mockrouting.Server, d delay.D, rateLimitGenerator RateLimitGenerator) Network {
48+
return &network{
49+
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
50+
rateLimiters: make(map[peer.ID]map[peer.ID]rateLimiter),
51+
clients: make(map[peer.ID]*receiverQueue),
52+
delay: d,
53+
routingserver: rs,
54+
isRateLimited: true,
55+
rateLimitGenerator: rateLimitGenerator,
56+
conns: make(map[string]struct{}),
3357
}
3458
}
3559

3660
type network struct {
37-
mu sync.Mutex
38-
latencies map[peer.ID]map[peer.ID]time.Duration
39-
clients map[peer.ID]*receiverQueue
40-
routingserver mockrouting.Server
41-
delay delay.D
42-
conns map[string]struct{}
61+
mu sync.Mutex
62+
latencies map[peer.ID]map[peer.ID]time.Duration
63+
rateLimiters map[peer.ID]map[peer.ID]rateLimiter
64+
clients map[peer.ID]*receiverQueue
65+
routingserver mockrouting.Server
66+
delay delay.D
67+
isRateLimited bool
68+
rateLimitGenerator RateLimitGenerator
69+
conns map[string]struct{}
4370
}
4471

4572
type message struct {
@@ -102,6 +129,26 @@ func (n *network) SendMessage(
102129
latencies[to] = latency
103130
}
104131

132+
var bandwidthDelay time.Duration
133+
if n.isRateLimited {
134+
rateLimiters, ok := n.rateLimiters[from]
135+
if !ok {
136+
rateLimiters = make(map[peer.ID]rateLimiter)
137+
n.rateLimiters[from] = rateLimiters
138+
}
139+
140+
rl, ok := rateLimiters[to]
141+
if !ok {
142+
rl = mocknet.NewRatelimiter(n.rateLimitGenerator.NextRateLimit())
143+
rateLimiters[to] = rl
144+
}
145+
146+
size := mes.ToProtoV1().Size()
147+
bandwidthDelay = rl.Limit(size)
148+
} else {
149+
bandwidthDelay = 0
150+
}
151+
105152
receiver, ok := n.clients[to]
106153
if !ok {
107154
return errors.New("cannot locate peer on network")
@@ -113,7 +160,7 @@ func (n *network) SendMessage(
113160
msg := &message{
114161
from: from,
115162
msg: mes,
116-
shouldSend: time.Now().Add(latency),
163+
shouldSend: time.Now().Add(latency).Add(bandwidthDelay),
117164
}
118165
receiver.enqueue(msg)
119166

testutil/testutil.go

+23
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package testutil
22

33
import (
4+
"bytes"
5+
6+
random "github.com/jbenet/go-random"
7+
48
bsmsg "github.com/ipfs/go-bitswap/message"
59
"github.com/ipfs/go-bitswap/wantlist"
610
"github.com/ipfs/go-block-format"
@@ -11,6 +15,25 @@ import (
1115

1216
var blockGenerator = blocksutil.NewBlockGenerator()
1317
var prioritySeq int
18+
var seedSeq int64
19+
20+
func randomBytes(n int64, seed int64) []byte {
21+
data := new(bytes.Buffer)
22+
random.WritePseudoRandomBytes(n, data, seed)
23+
return data.Bytes()
24+
}
25+
26+
// GenerateBlocksOfSize generates a series of blocks of the given byte size
27+
func GenerateBlocksOfSize(n int, size int64) []blocks.Block {
28+
generatedBlocks := make([]blocks.Block, 0, n)
29+
for i := 0; i < n; i++ {
30+
seedSeq++
31+
b := blocks.NewBlock(randomBytes(size, seedSeq))
32+
generatedBlocks = append(generatedBlocks, b)
33+
34+
}
35+
return generatedBlocks
36+
}
1437

1538
// GenerateCids produces n content identifiers.
1639
func GenerateCids(n int) []cid.Cid {

0 commit comments

Comments
 (0)