Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

feat(Benchmarks): Add real world dup blocks test #25

Merged
merged 1 commit into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 52 additions & 21 deletions dup_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,71 +33,102 @@ type runStats struct {
var benchmarkLog []runStats

func BenchmarkDups2Nodes(b *testing.B) {
fixedDelay := delay.Fixed(10 * time.Millisecond)
b.Run("AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, oneAtATime)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, oneAtATime)
})
b.Run("AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, batchFetchAll)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, batchFetchAll)
})

b.Run("Overlap1-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap1, oneAtATime)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap1, oneAtATime)
})

b.Run("Overlap2-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap2, batchFetchBy10)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap2, batchFetchBy10)
})

b.Run("Overlap3-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, oneAtATime)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, oneAtATime)
})
b.Run("Overlap3-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchBy10)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchBy10)
})
b.Run("Overlap3-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, fetchAllConcurrent)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, fetchAllConcurrent)
})
b.Run("Overlap3-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchAll)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchAll)
})
b.Run("Overlap3-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, unixfsFileFetch)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, unixfsFileFetch)
})
b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, oneAtATime)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, oneAtATime)
})
b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchBy10)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchBy10)
})
b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchAll)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchAll)
})
b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, fetchAllConcurrent)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, fetchAllConcurrent)
})
b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, unixfsFileFetch)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, unixfsFileFetch)
})
b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, oneAtATime)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, oneAtATime)
})
b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, batchFetchAll)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, batchFetchAll)
})
b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, unixfsFileFetch)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, unixfsFileFetch)
})
b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 200, 20, allToAll, batchFetchAll)
subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll)
})

out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("benchmark.json", out, 0666)
}

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, df distFunc, ff fetchFunc) {
const fastSpeed = 60 * time.Millisecond
const mediumSpeed = 200 * time.Millisecond
const slowSpeed = 800 * time.Millisecond
const superSlowSpeed = 4000 * time.Millisecond
const distribution = 20 * time.Millisecond

func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.0, 0.0, distribution, nil)
fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.3, 0.3, distribution, nil)
averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
0.3, 0.3, distribution, nil)
slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)

b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, fastNetworkDelay, allToAll, batchFetchAll)
})
b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, averageNetworkDelay, allToAll, batchFetchAll)
})
b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, slowNetworkDelay, allToAll, batchFetchAll)
})
}

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond))
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
sg := NewTestSessionGenerator(net)
defer sg.Close()

Expand Down
63 changes: 63 additions & 0 deletions testnet/internet_latency_delay_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package bitswap

import (
"math/rand"
"time"

"github.com/ipfs/go-ipfs-delay"
)

var sharedRNG = rand.New(rand.NewSource(time.Now().UnixNano()))

// InternetLatencyDelayGenerator generates three clusters of delays,
// typical of the type of peers you would encounter on the interenet
// Given a base delay time T, the wait time generated will be either:
// 1. A normalized distribution around the base time
// 2. A normalized distribution around the base time plus a "medium" delay
// 3. A normalized distribution around the base time plus a "large" delay
// The size of the medium & large delays are determined when the generator
// is constructed, as well as the relative percentages with which delays fall
// into each of the three different clusters, and the standard deviation for
// the normalized distribution
// This can be used to generate a number of scenarios typical of latency
// distribution among peers on the internet
func InternetLatencyDelayGenerator(
mediumDelay time.Duration,
largeDelay time.Duration,
percentMedium float64,
percentLarge float64,
std time.Duration,
rng *rand.Rand) delay.Generator {
if rng == nil {
rng = sharedRNG
}

return &internetLatencyDelayGenerator{
mediumDelay: mediumDelay,
largeDelay: largeDelay,
percentLarge: percentLarge,
percentMedium: percentMedium,
std: std,
rng: rng,
}
}

type internetLatencyDelayGenerator struct {
mediumDelay time.Duration
largeDelay time.Duration
percentLarge float64
percentMedium float64
std time.Duration
rng *rand.Rand
}

func (d *internetLatencyDelayGenerator) NextWaitTime(t time.Duration) time.Duration {
clusterDistribution := d.rng.Float64()
baseDelay := time.Duration(d.rng.NormFloat64()*float64(d.std)) + t
if clusterDistribution < d.percentLarge {
return baseDelay + d.largeDelay
} else if clusterDistribution < d.percentMedium+d.percentLarge {
return baseDelay + d.mediumDelay
}
return baseDelay
}
69 changes: 69 additions & 0 deletions testnet/internet_latency_delay_generator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package bitswap

import (
"math"
"math/rand"
"testing"
"time"
)

const testSeed = 99

func TestInternetLatencyDelayNextWaitTimeDistribution(t *testing.T) {
initialValue := 1000 * time.Millisecond
deviation := 100 * time.Millisecond
mediumDelay := 1000 * time.Millisecond
largeDelay := 3000 * time.Millisecond
percentMedium := 0.2
percentLarge := 0.4
buckets := make(map[string]int)
internetLatencyDistributionDelay := InternetLatencyDelayGenerator(
mediumDelay,
largeDelay,
percentMedium,
percentLarge,
deviation,
rand.New(rand.NewSource(testSeed)))

buckets["fast"] = 0
buckets["medium"] = 0
buckets["slow"] = 0
buckets["outside_1_deviation"] = 0

// strategy here is rather than mock randomness, just use enough samples to
// get approximately the distribution you'd expect
for i := 0; i < 10000; i++ {
next := internetLatencyDistributionDelay.NextWaitTime(initialValue)
if math.Abs((next - initialValue).Seconds()) <= deviation.Seconds() {
buckets["fast"]++
} else if math.Abs((next - initialValue - mediumDelay).Seconds()) <= deviation.Seconds() {
buckets["medium"]++
} else if math.Abs((next - initialValue - largeDelay).Seconds()) <= deviation.Seconds() {
buckets["slow"]++
} else {
buckets["outside_1_deviation"]++
}
}
totalInOneDeviation := float64(10000 - buckets["outside_1_deviation"])
oneDeviationPercentage := totalInOneDeviation / 10000
fastPercentageResult := float64(buckets["fast"]) / totalInOneDeviation
mediumPercentageResult := float64(buckets["medium"]) / totalInOneDeviation
slowPercentageResult := float64(buckets["slow"]) / totalInOneDeviation

// see 68-95-99 rule for normal distributions
if math.Abs(oneDeviationPercentage-0.6827) >= 0.1 {
t.Fatal("Failed to distribute values normally based on standard deviation")
}

if math.Abs(fastPercentageResult+percentMedium+percentLarge-1) >= 0.1 {
t.Fatal("Incorrect percentage of values distributed around fast delay time")
}

if math.Abs(mediumPercentageResult-percentMedium) >= 0.1 {
t.Fatal("Incorrect percentage of values distributed around medium delay time")
}

if math.Abs(slowPercentageResult-percentLarge) >= 0.1 {
t.Fatal("Incorrect percentage of values distributed around slow delay time")
}
}
46 changes: 39 additions & 7 deletions testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bitswap
import (
"context"
"errors"
"sort"
"sync"
"sync/atomic"
"time"
Expand All @@ -24,6 +25,7 @@ var log = logging.Logger("bstestnet")

func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
return &network{
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
clients: make(map[peer.ID]*receiverQueue),
delay: d,
routingserver: rs,
Expand All @@ -33,6 +35,7 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {

type network struct {
mu sync.Mutex
latencies map[peer.ID]map[peer.ID]time.Duration
clients map[peer.ID]*receiverQueue
routingserver mockrouting.Server
delay delay.D
Expand Down Expand Up @@ -87,6 +90,18 @@ func (n *network) SendMessage(
n.mu.Lock()
defer n.mu.Unlock()

latencies, ok := n.latencies[from]
if !ok {
latencies = make(map[peer.ID]time.Duration)
n.latencies[from] = latencies
}

latency, ok := latencies[to]
if !ok {
latency = n.delay.NextWaitTime()
latencies[to] = latency
}

receiver, ok := n.clients[to]
if !ok {
return errors.New("cannot locate peer on network")
Expand All @@ -98,7 +113,7 @@ func (n *network) SendMessage(
msg := &message{
from: from,
msg: mes,
shouldSend: time.Now().Add(n.delay.Get()),
shouldSend: time.Now().Add(latency),
}
receiver.enqueue(msg)

Expand Down Expand Up @@ -229,21 +244,38 @@ func (rq *receiverQueue) enqueue(m *message) {
}
}

func (rq *receiverQueue) Swap(i, j int) {
rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i]
}

func (rq *receiverQueue) Len() int {
return len(rq.queue)
}

func (rq *receiverQueue) Less(i, j int) bool {
return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano()
}

func (rq *receiverQueue) process() {
for {
rq.lk.Lock()
sort.Sort(rq)
if len(rq.queue) == 0 {
rq.active = false
rq.lk.Unlock()
return
}
m := rq.queue[0]
rq.queue = rq.queue[1:]
rq.lk.Unlock()

time.Sleep(time.Until(m.shouldSend))
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
if time.Until(m.shouldSend).Seconds() < 0.1 {
rq.queue = rq.queue[1:]
rq.lk.Unlock()
time.Sleep(time.Until(m.shouldSend))
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
} else {
rq.lk.Unlock()
time.Sleep(100 * time.Millisecond)
}
}
}

Expand Down