Skip to content

Commit ef85653

Browse files
kaseyjames-prysmrkapkaterencechain
committed
payload attribute computations in event handler (#14963)
* payload attribute computations in event handler * Skip executing nil lazyReaders * adding in clarifying comments, uncommenting needs fill, adding in happy path unit test for code coverage * gaz * fixing ineffectual assignment * nil check the Reader coming from the lazyReader * Apply suggestions from code review Radek's PR suggestion to fix error/log capitalization. Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Radek feedback * Move fire payload attribute event to after save head * set mock statenotifier in testServiceOptsWithDB --------- Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com> Co-authored-by: james-prysm <james@prysmaticlabs.com> Co-authored-by: Radosław Kapka <rkapka@wp.pl> Co-authored-by: terence tsao <terence@prysmaticlabs.com>
1 parent a8241e0 commit ef85653

8 files changed

+210
-91
lines changed

beacon-chain/blockchain/execution_engine.go

+7-28
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
7272
if arg.attributes == nil {
7373
arg.attributes = payloadattribute.EmptyWithVersion(headBlk.Version())
7474
}
75-
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), arg)
7675
payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, arg.attributes)
7776
if err != nil {
7877
switch {
@@ -159,47 +158,27 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
159158
log.WithFields(logrus.Fields{
160159
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(arg.headRoot[:])),
161160
"headSlot": headBlk.Slot(),
161+
"nextSlot": nextSlot,
162162
"payloadID": fmt.Sprintf("%#x", bytesutil.Trunc(payloadID[:])),
163163
}).Info("Forkchoice updated with payload attributes for proposal")
164164
s.cfg.PayloadIDCache.Set(nextSlot, arg.headRoot, pId)
165165
} else if hasAttr && payloadID == nil && !features.Get().PrepareAllPayloads {
166166
log.WithFields(logrus.Fields{
167167
"blockHash": fmt.Sprintf("%#x", headPayload.BlockHash()),
168168
"slot": headBlk.Slot(),
169+
"nextSlot": nextSlot,
169170
}).Error("Received nil payload ID on VALID engine response")
170171
}
171172
return payloadID, nil
172173
}
173174

174-
func firePayloadAttributesEvent(ctx context.Context, f event.SubscriberSender, cfg *fcuConfig) {
175-
pidx, err := helpers.BeaconProposerIndex(ctx, cfg.headState)
176-
if err != nil {
177-
log.WithError(err).
178-
WithField("head_root", cfg.headRoot[:]).
179-
Error("Could not get proposer index for PayloadAttributes event")
180-
return
181-
}
182-
evd := payloadattribute.EventData{
183-
ProposerIndex: pidx,
184-
ProposalSlot: cfg.headState.Slot(),
185-
ParentBlockRoot: cfg.headRoot[:],
186-
Attributer: cfg.attributes,
187-
HeadRoot: cfg.headRoot,
188-
HeadState: cfg.headState,
189-
HeadBlock: cfg.headBlock,
190-
}
191-
if cfg.headBlock != nil && !cfg.headBlock.IsNil() {
192-
headPayload, err := cfg.headBlock.Block().Body().Execution()
193-
if err != nil {
194-
log.WithError(err).Error("Could not get execution payload for head block")
195-
return
196-
}
197-
evd.ParentBlockHash = headPayload.BlockHash()
198-
evd.ParentBlockNumber = headPayload.BlockNumber()
199-
}
175+
func firePayloadAttributesEvent(_ context.Context, f event.SubscriberSender, nextSlot primitives.Slot) {
176+
// the fcu args have differing amounts of completeness based on the code path,
177+
// and there is work we only want to do if a client is actually listening to the events beacon api endpoint.
178+
// temporary solution: just fire a blank event and fill in the details in the api handler.
200179
f.Send(&feed.Event{
201180
Type: statefeed.PayloadAttributes,
202-
Data: evd,
181+
Data: payloadattribute.EventData{ProposalSlot: nextSlot},
203182
})
204183
}
205184

beacon-chain/blockchain/forkchoice_update_execution.go

+2
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuCo
102102
log.WithError(err).Error("could not save head")
103103
}
104104

105+
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1)
106+
105107
// Only need to prune attestations from pool if the head has changed.
106108
if err := s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock); err != nil {
107109
log.WithError(err).Error("could not prune attestations from pool")

beacon-chain/blockchain/mock_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package blockchain
33
import (
44
"testing"
55

6+
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
67
testDB "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
78
doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree"
89
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
@@ -18,6 +19,7 @@ func testServiceOptsWithDB(t *testing.T) []Option {
1819
WithStateGen(stategen.New(beaconDB, fcs)),
1920
WithForkChoiceStore(fcs),
2021
WithClockSynchronizer(cs),
22+
WithStateNotifier(&mock.MockStateNotifier{RecordEvents: true}),
2123
}
2224
}
2325

beacon-chain/blockchain/process_block.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -723,13 +723,9 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
723723
attribute := s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:])
724724
// return early if we are not proposing next slot
725725
if attribute.IsEmpty() {
726-
fcuArgs := &fcuConfig{
727-
headState: headState,
728-
headRoot: headRoot,
729-
headBlock: nil,
730-
attributes: attribute,
731-
}
732-
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), fcuArgs)
726+
// notifyForkchoiceUpdate fires the payload attribute event. But in this case, we won't
727+
// call notifyForkchoiceUpdate, so the event is fired here.
728+
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1)
733729
return
734730
}
735731

beacon-chain/rpc/eth/events/BUILD.bazel

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ go_library(
1818
"//beacon-chain/core/feed/operation:go_default_library",
1919
"//beacon-chain/core/feed/state:go_default_library",
2020
"//beacon-chain/core/helpers:go_default_library",
21-
"//beacon-chain/core/time:go_default_library",
21+
"//beacon-chain/core/transition:go_default_library",
22+
"//beacon-chain/state:go_default_library",
2223
"//config/params:go_default_library",
2324
"//consensus-types/interfaces:go_default_library",
2425
"//consensus-types/payload-attribute:go_default_library",
@@ -58,6 +59,7 @@ go_test(
5859
"//consensus-types/interfaces:go_default_library",
5960
"//consensus-types/payload-attribute:go_default_library",
6061
"//consensus-types/primitives:go_default_library",
62+
"//proto/engine/v1:go_default_library",
6163
"//proto/eth/v1:go_default_library",
6264
"//proto/prysm/v1alpha1:go_default_library",
6365
"//testing/require:go_default_library",

beacon-chain/rpc/eth/events/events.go

+85-55
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"runtime/debug"
1011
"strconv"
1112
"time"
1213

@@ -20,7 +21,8 @@ import (
2021
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
2122
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
2223
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
23-
chaintime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
24+
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
25+
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
2426
"github.com/prysmaticlabs/prysm/v5/config/params"
2527
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
2628
payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute"
@@ -352,9 +354,18 @@ func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyRea
352354
if r := recover(); r != nil {
353355
log.WithField("panic", r).Error("Recovered from panic while writing event to client.")
354356
err = errWriterUnusable
357+
debug.PrintStack()
355358
}
356359
}()
360+
if lr == nil {
361+
log.Warn("Event stream skipping a nil lazy event reader callback")
362+
return nil
363+
}
357364
r := lr()
365+
if r == nil {
366+
log.Warn("Event stream skipping a nil event reader")
367+
return nil
368+
}
358369
out, err := io.ReadAll(r)
359370
if err != nil {
360371
return err
@@ -600,62 +611,45 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
600611

601612
var errUnsupportedPayloadAttribute = errors.New("cannot compute payload attributes pre-Bellatrix")
602613

603-
func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.Attributer, error) {
604-
v := ev.HeadState.Version()
614+
func (s *Server) computePayloadAttributes(ctx context.Context, st state.ReadOnlyBeaconState, root [32]byte, proposer primitives.ValidatorIndex, timestamp uint64, randao []byte) (payloadattribute.Attributer, error) {
615+
v := st.Version()
605616
if v < version.Bellatrix {
606617
return nil, errors.Wrapf(errUnsupportedPayloadAttribute, "%s is not supported", version.String(v))
607618
}
608619

609-
t, err := slots.ToTime(ev.HeadState.GenesisTime(), ev.HeadState.Slot())
610-
if err != nil {
611-
return nil, errors.Wrap(err, "could not get head state slot time")
612-
}
613-
timestamp := uint64(t.Unix())
614-
prevRando, err := helpers.RandaoMix(ev.HeadState, chaintime.CurrentEpoch(ev.HeadState))
615-
if err != nil {
616-
return nil, errors.Wrap(err, "could not get head state randao mix")
617-
}
618-
proposerIndex, err := helpers.BeaconProposerIndex(ctx, ev.HeadState)
619-
if err != nil {
620-
return nil, errors.Wrap(err, "could not get head state proposer index")
621-
}
622620
feeRecpt := params.BeaconConfig().DefaultFeeRecipient.Bytes()
623-
tValidator, exists := s.TrackedValidatorsCache.Validator(proposerIndex)
621+
tValidator, exists := s.TrackedValidatorsCache.Validator(proposer)
624622
if exists {
625623
feeRecpt = tValidator.FeeRecipient[:]
626624
}
627625

628626
if v == version.Bellatrix {
629627
return payloadattribute.New(&engine.PayloadAttributes{
630628
Timestamp: timestamp,
631-
PrevRandao: prevRando,
629+
PrevRandao: randao,
632630
SuggestedFeeRecipient: feeRecpt,
633631
})
634632
}
635633

636-
w, _, err := ev.HeadState.ExpectedWithdrawals()
634+
w, _, err := st.ExpectedWithdrawals()
637635
if err != nil {
638636
return nil, errors.Wrap(err, "could not get withdrawals from head state")
639637
}
640638
if v == version.Capella {
641639
return payloadattribute.New(&engine.PayloadAttributesV2{
642640
Timestamp: timestamp,
643-
PrevRandao: prevRando,
641+
PrevRandao: randao,
644642
SuggestedFeeRecipient: feeRecpt,
645643
Withdrawals: w,
646644
})
647645
}
648646

649-
pr, err := ev.HeadBlock.Block().HashTreeRoot()
650-
if err != nil {
651-
return nil, errors.Wrap(err, "could not compute head block root")
652-
}
653647
return payloadattribute.New(&engine.PayloadAttributesV3{
654648
Timestamp: timestamp,
655-
PrevRandao: prevRando,
649+
PrevRandao: randao,
656650
SuggestedFeeRecipient: feeRecpt,
657651
Withdrawals: w,
658-
ParentBeaconBlockRoot: pr[:],
652+
ParentBeaconBlockRoot: root[:],
659653
})
660654
}
661655

@@ -665,37 +659,75 @@ type asyncPayloadAttrData struct {
665659
err error
666660
}
667661

662+
var zeroRoot [32]byte
663+
664+
// needsFill allows tests to provide filled EventData values. An ordinary event data value fired by the blockchain package will have
665+
// all of the checked fields empty, so the logical short circuit should hit immediately.
666+
func needsFill(ev payloadattribute.EventData) bool {
667+
return ev.HeadState == nil || ev.HeadState.IsNil() || ev.HeadState.LatestBlockHeader() == nil ||
668+
ev.HeadBlock == nil || ev.HeadBlock.IsNil() ||
669+
ev.HeadRoot == zeroRoot || len(ev.ParentBlockRoot) == 0 || len(ev.ParentBlockHash) == 0 ||
670+
ev.Attributer == nil || ev.Attributer.IsEmpty()
671+
}
672+
668673
func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.EventData, error) {
669-
if ev.HeadBlock == nil || ev.HeadBlock.IsNil() {
670-
hb, err := s.HeadFetcher.HeadBlock(ctx)
671-
if err != nil {
672-
return ev, errors.Wrap(err, "Could not look up head block")
673-
}
674-
root, err := hb.Block().HashTreeRoot()
675-
if err != nil {
676-
return ev, errors.Wrap(err, "Could not compute head block root")
677-
}
678-
if ev.HeadRoot != root {
679-
return ev, errors.Wrap(err, "head root changed before payload attribute event handler execution")
680-
}
681-
ev.HeadBlock = hb
682-
payload, err := hb.Block().Body().Execution()
683-
if err != nil {
684-
return ev, errors.Wrap(err, "Could not get execution payload for head block")
685-
}
686-
ev.ParentBlockHash = payload.BlockHash()
687-
ev.ParentBlockNumber = payload.BlockNumber()
674+
var err error
675+
676+
if !needsFill(ev) {
677+
return ev, nil
688678
}
689679

690-
attr := ev.Attributer
691-
if attr == nil || attr.IsEmpty() {
692-
attr, err := s.computePayloadAttributes(ctx, ev)
680+
ev.HeadState, err = s.HeadFetcher.HeadState(ctx)
681+
if err != nil {
682+
return ev, errors.Wrap(err, "could not get head state")
683+
}
684+
685+
ev.HeadBlock, err = s.HeadFetcher.HeadBlock(ctx)
686+
if err != nil {
687+
return ev, errors.Wrap(err, "could not look up head block")
688+
}
689+
ev.HeadRoot, err = ev.HeadBlock.Block().HashTreeRoot()
690+
if err != nil {
691+
return ev, errors.Wrap(err, "could not compute head block root")
692+
}
693+
pr := ev.HeadBlock.Block().ParentRoot()
694+
ev.ParentBlockRoot = pr[:]
695+
696+
hsr, err := ev.HeadState.LatestBlockHeader().HashTreeRoot()
697+
if err != nil {
698+
return ev, errors.Wrap(err, "could not compute latest block header root")
699+
}
700+
701+
pse := slots.ToEpoch(ev.ProposalSlot)
702+
st := ev.HeadState
703+
if slots.ToEpoch(st.Slot()) != pse {
704+
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, hsr[:], ev.ProposalSlot)
693705
if err != nil {
694-
return ev, errors.Wrap(err, "Could not compute payload attributes")
706+
return ev, errors.Wrap(err, "could not run process blocks on head state into the proposal slot epoch")
695707
}
696-
ev.Attributer = attr
697708
}
698-
return ev, nil
709+
ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot)
710+
if err != nil {
711+
return ev, errors.Wrap(err, "failed to compute proposer index")
712+
}
713+
randao, err := helpers.RandaoMix(st, pse)
714+
if err != nil {
715+
return ev, errors.Wrap(err, "could not get head state randado")
716+
}
717+
718+
payload, err := ev.HeadBlock.Block().Body().Execution()
719+
if err != nil {
720+
return ev, errors.Wrap(err, "could not get execution payload for head block")
721+
}
722+
ev.ParentBlockHash = payload.BlockHash()
723+
ev.ParentBlockNumber = payload.BlockNumber()
724+
725+
t, err := slots.ToTime(st.GenesisTime(), ev.ProposalSlot)
726+
if err != nil {
727+
return ev, errors.Wrap(err, "could not get head state slot time")
728+
}
729+
ev.Attributer, err = s.computePayloadAttributes(ctx, st, hsr, ev.ProposerIndex, uint64(t.Unix()), randao)
730+
return ev, err
699731
}
700732

701733
// This event stream is intended to be used by builders and relays.
@@ -704,10 +736,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut
704736
ctx, cancel := context.WithTimeout(ctx, payloadAttributeTimeout)
705737
edc := make(chan asyncPayloadAttrData)
706738
go func() {
707-
d := asyncPayloadAttrData{
708-
version: version.String(ev.HeadState.Version()),
709-
}
710-
739+
d := asyncPayloadAttrData{}
711740
defer func() {
712741
edc <- d
713742
}()
@@ -716,6 +745,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut
716745
d.err = errors.Wrap(err, "Could not fill event data")
717746
return
718747
}
748+
d.version = version.String(ev.HeadBlock.Version())
719749
attributesBytes, err := marshalAttributes(ev.Attributer)
720750
if err != nil {
721751
d.err = errors.Wrap(err, "errors marshaling payload attributes to json")

0 commit comments

Comments
 (0)