Skip to content

Commit 35a9852

Browse files
committed
chore(events): improve perf for parallel event filter matching
1. Cache address look-ups for the given tipset across filters 2. Run the filters in an errgroup
1 parent 773efae commit 35a9852

File tree

3 files changed

+59
-31
lines changed

3 files changed

+59
-31
lines changed

chain/events/filter/event.go

+50-23
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/ipfs/go-cid"
1111
cbg "github.com/whyrusleeping/cbor-gen"
12+
"golang.org/x/sync/errgroup"
1213
"golang.org/x/xerrors"
1314

1415
"github.com/filecoin-project/go-address"
@@ -28,7 +29,10 @@ func isIndexedValue(b uint8) bool {
2829
return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0
2930
}
3031

31-
type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.Address, bool)
32+
// AddressResolver is a function that resolves an actor ID to an address. If the
33+
// actor ID cannot be resolved to an address, the function should return
34+
// address.Undef.
35+
type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) address.Address
3236

3337
type EventFilter interface {
3438
Filter
@@ -77,9 +81,6 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
7781
return nil
7882
}
7983

80-
// cache of lookups between actor id and f4 address
81-
addressLookups := make(map[abi.ActorID]address.Address)
82-
8384
ems, err := te.messages(ctx)
8485
if err != nil {
8586
return xerrors.Errorf("load executed messages: %w", err)
@@ -89,16 +90,10 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
8990

9091
for msgIdx, em := range ems {
9192
for _, ev := range em.Events() {
92-
// lookup address corresponding to the actor id
93-
addr, found := addressLookups[ev.Emitter]
94-
if !found {
95-
var ok bool
96-
addr, ok = resolver(ctx, ev.Emitter, te.rctTs)
97-
if !ok {
98-
// not an address we will be able to match against
99-
continue
100-
}
101-
addressLookups[ev.Emitter] = addr
93+
addr := resolver(ctx, ev.Emitter, te.rctTs)
94+
if addr == address.Undef {
95+
// not an address we will be able to match against
96+
continue
10297
}
10398

10499
if !f.matchAddress(addr) {
@@ -295,7 +290,7 @@ func (e *executedMessage) Events() []*types.Event {
295290

296291
type EventFilterManager struct {
297292
ChainStore *cstore.ChainStore
298-
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
293+
AddressResolver AddressResolver
299294
MaxFilterResults int
300295
ChainIndexer index.Indexer
301296

@@ -319,11 +314,18 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
319314
load: m.loadExecutedMessages,
320315
}
321316

322-
// TODO: could run this loop in parallel with errgroup if there are many filters
317+
tsAddressResolver := m.createTipSetCachedAddressResolver()
318+
319+
g, ctx := errgroup.WithContext(ctx)
323320
for _, f := range m.filters {
324-
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
325-
return err
326-
}
321+
f := f
322+
g.Go(func() error {
323+
return f.CollectEvents(ctx, tse, false, tsAddressResolver)
324+
})
325+
}
326+
327+
if err := g.Wait(); err != nil {
328+
return err
327329
}
328330

329331
return nil
@@ -344,16 +346,41 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
344346
load: m.loadExecutedMessages,
345347
}
346348

347-
// TODO: could run this loop in parallel with errgroup if there are many filters
349+
tsAddressResolver := m.createTipSetCachedAddressResolver()
350+
351+
g, ctx := errgroup.WithContext(ctx)
348352
for _, f := range m.filters {
349-
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
350-
return err
351-
}
353+
f := f
354+
g.Go(func() error {
355+
return f.CollectEvents(ctx, tse, true, tsAddressResolver)
356+
})
357+
}
358+
359+
if err := g.Wait(); err != nil {
360+
return err
352361
}
353362

354363
return nil
355364
}
356365

366+
func (m *EventFilterManager) createTipSetCachedAddressResolver() func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
367+
addressLookups := make(map[abi.ActorID]address.Address)
368+
var addressLookupsLk sync.Mutex
369+
370+
return func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
371+
addressLookupsLk.Lock()
372+
defer addressLookupsLk.Unlock()
373+
374+
addr, ok := addressLookups[emitter]
375+
if !ok {
376+
addr = m.AddressResolver(ctx, emitter, ts)
377+
addressLookups[emitter] = addr
378+
}
379+
380+
return addr
381+
}
382+
}
383+
357384
func (m *EventFilterManager) Fill(
358385
ctx context.Context,
359386
minHeight,

chain/events/filter/event_test.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,10 @@ func (a addressMap) add(actorID abi.ActorID, addr address.Address) {
441441
a[actorID] = addr
442442
}
443443

444-
func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
444+
func (a addressMap) ResolveAddress(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
445445
ra, ok := a[emitter]
446-
return ra, ok
446+
if ok {
447+
return ra
448+
}
449+
return address.Undef
447450
}

node/modules/actorevent.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -105,20 +105,18 @@ func EventFilterManager(cfg config.EventsConfig) func(helpers.MetricsCtx, repo.L
105105
fm := &filter.EventFilterManager{
106106
ChainStore: cs,
107107
ChainIndexer: ci,
108-
// TODO:
109-
// We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands
110-
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) {
108+
AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) address.Address {
111109
idAddr, err := address.NewIDAddress(uint64(emitter))
112110
if err != nil {
113-
return address.Undef, false
111+
return address.Undef
114112
}
115113

116114
actor, err := sm.LoadActor(ctx, idAddr, ts)
117115
if err != nil || actor.DelegatedAddress == nil {
118-
return idAddr, true
116+
return idAddr
119117
}
120118

121-
return *actor.DelegatedAddress, true
119+
return *actor.DelegatedAddress
122120
},
123121

124122
MaxFilterResults: cfg.MaxFilterResults,

0 commit comments

Comments
 (0)