Skip to content

Commit b8d21ac

Browse files
burmanmyurishkuro
authored andcommitted
Reduce memory pressure in badger write-path key creation (#1771)
Small memory optimizations in badger write-path pressure. Also, add some benchmarks for easier profiling to improve performance in the future. Signed-off-by: Michael Burman <yak@iki.fi>
1 parent ff3d3c9 commit b8d21ac

File tree

6 files changed

+283
-87
lines changed

6 files changed

+283
-87
lines changed

plugin/storage/badger/spanstore/cache.go

+16-17
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ import (
2626
type CacheStore struct {
2727
// Given the small amount of data these will store, we use the same structure as the memory store
2828
cacheLock sync.Mutex // write heavy - Mutex is faster than RWMutex for writes
29-
services map[string]int64
30-
operations map[string]map[string]int64
29+
services map[string]uint64
30+
operations map[string]map[string]uint64
3131

3232
store *badger.DB
3333
ttl time.Duration
@@ -36,8 +36,8 @@ type CacheStore struct {
3636
// NewCacheStore returns initialized CacheStore for badger use
3737
func NewCacheStore(db *badger.DB, ttl time.Duration, prefill bool) *CacheStore {
3838
cs := &CacheStore{
39-
services: make(map[string]int64),
40-
operations: make(map[string]map[string]int64),
39+
services: make(map[string]uint64),
40+
operations: make(map[string]map[string]uint64),
4141
ttl: ttl,
4242
store: db,
4343
}
@@ -71,7 +71,7 @@ func (c *CacheStore) loadServices() {
7171
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
7272
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
7373
serviceName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
74-
keyTTL := int64(it.Item().ExpiresAt())
74+
keyTTL := it.Item().ExpiresAt()
7575
if v, found := c.services[serviceName]; found {
7676
if v > keyTTL {
7777
continue
@@ -89,17 +89,17 @@ func (c *CacheStore) loadOperations(service string) {
8989
it := txn.NewIterator(opts)
9090
defer it.Close()
9191

92-
serviceKey := make([]byte, 0, len(service)+1)
93-
serviceKey = append(serviceKey, operationNameIndexKey)
94-
serviceKey = append(serviceKey, service...)
92+
serviceKey := make([]byte, len(service)+1)
93+
serviceKey[0] = operationNameIndexKey
94+
copy(serviceKey[1:], service)
9595

9696
// Seek all the services first
9797
for it.Seek(serviceKey); it.ValidForPrefix(serviceKey); it.Next() {
9898
timestampStartIndex := len(it.Item().Key()) - (sizeOfTraceID + 8) // 8 = sizeof(uint64)
9999
operationName := string(it.Item().Key()[len(serviceKey):timestampStartIndex])
100-
keyTTL := int64(it.Item().ExpiresAt())
100+
keyTTL := it.Item().ExpiresAt()
101101
if _, found := c.operations[service]; !found {
102-
c.operations[service] = make(map[string]int64)
102+
c.operations[service] = make(map[string]uint64)
103103
}
104104

105105
if v, found := c.operations[service][operationName]; found {
@@ -114,22 +114,21 @@ func (c *CacheStore) loadOperations(service string) {
114114
}
115115

116116
// Update caches the results of service and service + operation indexes and maintains their TTL
117-
func (c *CacheStore) Update(service string, operation string) {
117+
func (c *CacheStore) Update(service, operation string, expireTime uint64) {
118118
c.cacheLock.Lock()
119-
t := time.Now().Add(c.ttl).Unix()
120119

121-
c.services[service] = t
120+
c.services[service] = expireTime
122121
if _, ok := c.operations[service]; !ok {
123-
c.operations[service] = make(map[string]int64)
122+
c.operations[service] = make(map[string]uint64)
124123
}
125-
c.operations[service][operation] = t
124+
c.operations[service][operation] = expireTime
126125
c.cacheLock.Unlock()
127126
}
128127

129128
// GetOperations returns all operations for a specific service traced by Jaeger
130129
func (c *CacheStore) GetOperations(service string) ([]string, error) {
131130
operations := make([]string, 0, len(c.services))
132-
t := time.Now().Unix()
131+
t := uint64(time.Now().Unix())
133132
c.cacheLock.Lock()
134133
defer c.cacheLock.Unlock()
135134

@@ -157,7 +156,7 @@ func (c *CacheStore) GetOperations(service string) ([]string, error) {
157156
// GetServices returns all services traced by Jaeger
158157
func (c *CacheStore) GetServices() ([]string, error) {
159158
services := make([]string, 0, len(c.services))
160-
t := time.Now().Unix()
159+
t := uint64(time.Now().Unix())
161160
c.cacheLock.Lock()
162161
// Fetch the items
163162
for k, v := range c.services {

plugin/storage/badger/spanstore/cache_test.go

+17-14
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,32 @@ func TestExpiredItems(t *testing.T) {
3333
runWithBadger(t, func(store *badger.DB, t *testing.T) {
3434
cache := NewCacheStore(store, time.Duration(-1*time.Hour), false)
3535

36+
expireTime := uint64(time.Now().Add(cache.ttl).Unix())
37+
3638
// Expired service
3739

38-
cache.Update("service1", "op1")
39-
cache.Update("service1", "op2")
40+
cache.Update("service1", "op1", expireTime)
41+
cache.Update("service1", "op2", expireTime)
4042

4143
services, err := cache.GetServices()
4244
assert.NoError(t, err)
4345
assert.Equal(t, 0, len(services)) // Everything should be expired
4446

4547
// Expired service for operations
4648

47-
cache.Update("service1", "op1")
48-
cache.Update("service1", "op2")
49+
cache.Update("service1", "op1", expireTime)
50+
cache.Update("service1", "op2", expireTime)
4951

5052
operations, err := cache.GetOperations("service1")
5153
assert.NoError(t, err)
5254
assert.Equal(t, 0, len(operations)) // Everything should be expired
5355

5456
// Expired operations, stable service
5557

56-
cache.Update("service1", "op1")
57-
cache.Update("service1", "op2")
58+
cache.Update("service1", "op1", expireTime)
59+
cache.Update("service1", "op2", expireTime)
5860

59-
cache.services["service1"] = time.Now().Unix() + 1e10
61+
cache.services["service1"] = uint64(time.Now().Unix() + 1e10)
6062

6163
operations, err = cache.GetOperations("service1")
6264
assert.NoError(t, err)
@@ -66,8 +68,9 @@ func TestExpiredItems(t *testing.T) {
6668

6769
func TestOldReads(t *testing.T) {
6870
runWithBadger(t, func(store *badger.DB, t *testing.T) {
69-
s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), time.Now(), model.TraceID{High: 0, Low: 0})
70-
s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), time.Now(), model.TraceID{High: 0, Low: 0})
71+
timeNow := model.TimeAsEpochMicroseconds(time.Now())
72+
s1Key := createIndexKey(serviceNameIndexKey, []byte("service1"), timeNow, model.TraceID{High: 0, Low: 0})
73+
s1o1Key := createIndexKey(operationNameIndexKey, []byte("service1operation1"), timeNow, model.TraceID{High: 0, Low: 0})
7174

7275
tid := time.Now().Add(1 * time.Minute)
7376

@@ -90,15 +93,15 @@ func TestOldReads(t *testing.T) {
9093

9194
nuTid := tid.Add(1 * time.Hour)
9295

93-
cache.Update("service1", "operation1")
94-
cache.services["service1"] = nuTid.Unix()
95-
cache.operations["service1"]["operation1"] = nuTid.Unix()
96+
cache.Update("service1", "operation1", uint64(tid.Unix()))
97+
cache.services["service1"] = uint64(nuTid.Unix())
98+
cache.operations["service1"]["operation1"] = uint64(nuTid.Unix())
9699

97100
cache.populateCaches()
98101

99102
// Now make sure we didn't use the older timestamps from the DB
100-
assert.Equal(t, nuTid.Unix(), cache.services["service1"])
101-
assert.Equal(t, nuTid.Unix(), cache.operations["service1"]["operation1"])
103+
assert.Equal(t, uint64(nuTid.Unix()), cache.services["service1"])
104+
assert.Equal(t, uint64(nuTid.Unix()), cache.operations["service1"]["operation1"])
102105
})
103106
}
104107

plugin/storage/badger/spanstore/read_write_test.go

+185
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ import (
1919
"fmt"
2020
"io"
2121
"io/ioutil"
22+
"log"
23+
"os"
24+
"runtime/pprof"
2225
"testing"
2326
"time"
2427

@@ -251,6 +254,8 @@ func TestIndexSeeks(t *testing.T) {
251254
trs, err = sr.FindTraces(context.Background(), params)
252255
assert.NoError(t, err)
253256
assert.Equal(t, 6, len(trs))
257+
assert.Equal(t, uint64(56), trs[0].Spans[0].TraceID.Low)
258+
assert.Equal(t, uint64(51), trs[5].Spans[0].TraceID.Low)
254259
})
255260
}
256261

@@ -442,3 +447,183 @@ func runFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer,
442447
}()
443448
test(tb, sw, sr)
444449
}
450+
451+
// Benchmarks intended for profiling
452+
453+
func writeSpans(sw spanstore.Writer, tags []model.KeyValue, services, operations []string, traces, spans int, high uint64, tid time.Time) {
454+
for i := 0; i < traces; i++ {
455+
for j := 0; j < spans; j++ {
456+
s := model.Span{
457+
TraceID: model.TraceID{
458+
Low: uint64(i),
459+
High: high,
460+
},
461+
SpanID: model.SpanID(j),
462+
OperationName: operations[j],
463+
Process: &model.Process{
464+
ServiceName: services[j],
465+
},
466+
Tags: tags,
467+
StartTime: tid.Add(time.Duration(time.Millisecond)),
468+
Duration: time.Duration(time.Millisecond * time.Duration(i+j)),
469+
}
470+
_ = sw.WriteSpan(&s)
471+
}
472+
}
473+
}
474+
475+
func BenchmarkWrites(b *testing.B) {
476+
runFactoryTest(b, func(tb testing.TB, sw spanstore.Writer, sr spanstore.Reader) {
477+
tid := time.Now()
478+
traces := 1000
479+
spans := 32
480+
tagsCount := 64
481+
tags, services, operations := makeWriteSupports(tagsCount, spans)
482+
483+
f, err := os.Create("writes.out")
484+
if err != nil {
485+
log.Fatal("could not create CPU profile: ", err)
486+
}
487+
if err := pprof.StartCPUProfile(f); err != nil {
488+
log.Fatal("could not start CPU profile: ", err)
489+
}
490+
defer pprof.StopCPUProfile()
491+
492+
b.ResetTimer()
493+
for a := 0; a < b.N; a++ {
494+
writeSpans(sw, tags, services, operations, traces, spans, uint64(0), tid)
495+
}
496+
b.StopTimer()
497+
})
498+
}
499+
500+
func makeWriteSupports(tagsCount, spans int) ([]model.KeyValue, []string, []string) {
501+
tags := make([]model.KeyValue, tagsCount)
502+
for i := 0; i < tagsCount; i++ {
503+
tags[i] = model.KeyValue{
504+
Key: fmt.Sprintf("a%d", i),
505+
VStr: fmt.Sprintf("b%d", i),
506+
}
507+
}
508+
operations := make([]string, spans)
509+
for j := 0; j < spans; j++ {
510+
operations[j] = fmt.Sprintf("operation-%d", j)
511+
}
512+
services := make([]string, spans)
513+
for i := 0; i < spans; i++ {
514+
services[i] = fmt.Sprintf("service-%d", i)
515+
}
516+
517+
return tags, services, operations
518+
}
519+
520+
func makeReadBenchmark(b *testing.B, tid time.Time, params *spanstore.TraceQueryParameters, outputFile string) {
521+
runLargeFactoryTest(b, func(tb testing.TB, sw spanstore.Writer, sr spanstore.Reader) {
522+
tid := time.Now()
523+
524+
// Total amount of traces is traces * tracesTimes
525+
traces := 1000
526+
tracesTimes := 1
527+
528+
// Total amount of spans written is traces * tracesTimes * spans
529+
spans := 32
530+
531+
// Default is 160k
532+
533+
tagsCount := 64
534+
tags, services, operations := makeWriteSupports(tagsCount, spans)
535+
536+
for h := 0; h < tracesTimes; h++ {
537+
writeSpans(sw, tags, services, operations, traces, spans, uint64(h), tid)
538+
}
539+
540+
f, err := os.Create(outputFile)
541+
if err != nil {
542+
log.Fatal("could not create CPU profile: ", err)
543+
}
544+
if err := pprof.StartCPUProfile(f); err != nil {
545+
log.Fatal("could not start CPU profile: ", err)
546+
}
547+
defer pprof.StopCPUProfile()
548+
549+
b.ResetTimer()
550+
for a := 0; a < b.N; a++ {
551+
sr.FindTraces(context.Background(), params)
552+
}
553+
b.StopTimer()
554+
})
555+
556+
}
557+
558+
func BenchmarkServiceTagsRangeQueryLimitIndexFetch(b *testing.B) {
559+
tid := time.Now()
560+
params := &spanstore.TraceQueryParameters{
561+
StartTimeMin: tid,
562+
StartTimeMax: tid.Add(time.Duration(time.Millisecond * 2000)),
563+
ServiceName: "service-1",
564+
Tags: map[string]string{
565+
"a8": "b8",
566+
},
567+
}
568+
569+
params.DurationMin = time.Duration(1 * time.Millisecond) // durationQuery takes 53% of total execution time..
570+
params.NumTraces = 50
571+
572+
makeReadBenchmark(b, tid, params, "scanrangeandindexlimit.out")
573+
}
574+
575+
func BenchmarkServiceIndexLimitFetch(b *testing.B) {
576+
tid := time.Now()
577+
params := &spanstore.TraceQueryParameters{
578+
StartTimeMin: tid,
579+
StartTimeMax: tid.Add(time.Duration(time.Millisecond * 2000)),
580+
ServiceName: "service-1",
581+
}
582+
583+
params.NumTraces = 50
584+
585+
makeReadBenchmark(b, tid, params, "serviceindexlimit.out")
586+
}
587+
588+
// Opens a badger db and runs a a test on it.
589+
func runLargeFactoryTest(tb testing.TB, test func(tb testing.TB, sw spanstore.Writer, sr spanstore.Reader)) {
590+
f := badger.NewFactory()
591+
opts := badger.NewOptions("badger")
592+
v, command := config.Viperize(opts.AddFlags)
593+
594+
dir := "/mnt/ssd/badger/testRun"
595+
err := os.MkdirAll(dir, 0700)
596+
defer os.RemoveAll(dir)
597+
assert.NoError(tb, err)
598+
keyParam := fmt.Sprintf("--badger.directory-key=%s", dir)
599+
valueParam := fmt.Sprintf("--badger.directory-value=%s", dir)
600+
601+
command.ParseFlags([]string{
602+
"--badger.ephemeral=false",
603+
"--badger.consistency=false", // Consistency is false as default to reduce effect of disk speed
604+
keyParam,
605+
valueParam,
606+
})
607+
608+
f.InitFromViper(v)
609+
610+
err = f.Initialize(metrics.NullFactory, zap.NewNop())
611+
assert.NoError(tb, err)
612+
613+
sw, err := f.CreateSpanWriter()
614+
assert.NoError(tb, err)
615+
616+
sr, err := f.CreateSpanReader()
617+
assert.NoError(tb, err)
618+
619+
defer func() {
620+
if closer, ok := sw.(io.Closer); ok {
621+
err := closer.Close()
622+
assert.NoError(tb, err)
623+
} else {
624+
tb.FailNow()
625+
}
626+
627+
}()
628+
test(tb, sw, sr)
629+
}

0 commit comments

Comments
 (0)