Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

payload attribute computations in event handler #14963

Merged
merged 11 commits into from
Mar 7, 2025
35 changes: 7 additions & 28 deletions beacon-chain/blockchain/execution_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
if arg.attributes == nil {
arg.attributes = payloadattribute.EmptyWithVersion(headBlk.Version())
}
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), arg)
payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, arg.attributes)
if err != nil {
switch {
Expand Down Expand Up @@ -159,47 +158,27 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (*
log.WithFields(logrus.Fields{
"blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(arg.headRoot[:])),
"headSlot": headBlk.Slot(),
"nextSlot": nextSlot,
"payloadID": fmt.Sprintf("%#x", bytesutil.Trunc(payloadID[:])),
}).Info("Forkchoice updated with payload attributes for proposal")
s.cfg.PayloadIDCache.Set(nextSlot, arg.headRoot, pId)
} else if hasAttr && payloadID == nil && !features.Get().PrepareAllPayloads {
log.WithFields(logrus.Fields{
"blockHash": fmt.Sprintf("%#x", headPayload.BlockHash()),
"slot": headBlk.Slot(),
"nextSlot": nextSlot,
}).Error("Received nil payload ID on VALID engine response")
}
return payloadID, nil
}

func firePayloadAttributesEvent(ctx context.Context, f event.SubscriberSender, cfg *fcuConfig) {
pidx, err := helpers.BeaconProposerIndex(ctx, cfg.headState)
if err != nil {
log.WithError(err).
WithField("head_root", cfg.headRoot[:]).
Error("Could not get proposer index for PayloadAttributes event")
return
}
evd := payloadattribute.EventData{
ProposerIndex: pidx,
ProposalSlot: cfg.headState.Slot(),
ParentBlockRoot: cfg.headRoot[:],
Attributer: cfg.attributes,
HeadRoot: cfg.headRoot,
HeadState: cfg.headState,
HeadBlock: cfg.headBlock,
}
if cfg.headBlock != nil && !cfg.headBlock.IsNil() {
headPayload, err := cfg.headBlock.Block().Body().Execution()
if err != nil {
log.WithError(err).Error("Could not get execution payload for head block")
return
}
evd.ParentBlockHash = headPayload.BlockHash()
evd.ParentBlockNumber = headPayload.BlockNumber()
}
func firePayloadAttributesEvent(_ context.Context, f event.SubscriberSender, nextSlot primitives.Slot) {
// the fcu args have differing amounts of completeness based on the code path,
// and there is work we only want to do if a client is actually listening to the events beacon api endpoint.
// temporary solution: just fire a blank event and fill in the details in the api handler.
Comment on lines +176 to +178
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the cfg parameter means that in case of firing from notifyForkchoiceUpdate you have to do extra work in the handler, since you need to recompute what you already have available. What about leaving the parameter and filling event data only when cfg != nil? You can then pass nil as the value in lateBlockTasks.

Copy link
Contributor Author

@kasey kasey Mar 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing it this way sidesteps several issues:

  • these code paths are already complex and error prone. let's minimize the amount of edge cases that could be coming in from callers that this function needs to anticipate.
  • timing concerns that could arise from the node passing into the next slot while the event waits in queue
  • concerns around head state being mutated / needing to make a copy
  • doing needless work when nothing is actually listening to the event. we have learned that most nodes in the wild are not subscribed to events, so it makes sense to defer all processing until we know the feed has a listener (in the event handler).

f.Send(&feed.Event{
Type: statefeed.PayloadAttributes,
Data: evd,
Data: payloadattribute.EventData{ProposalSlot: nextSlot},
})
}

Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/forkchoice_update_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuCo
log.WithError(err).Error("could not save head")
}

go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1)

// Only need to prune attestations from pool if the head has changed.
if err := s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock); err != nil {
log.WithError(err).Error("could not prune attestations from pool")
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blockchain
import (
"testing"

mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
testDB "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
Expand All @@ -18,6 +19,7 @@ func testServiceOptsWithDB(t *testing.T) []Option {
WithStateGen(stategen.New(beaconDB, fcs)),
WithForkChoiceStore(fcs),
WithClockSynchronizer(cs),
WithStateNotifier(&mock.MockStateNotifier{RecordEvents: true}),
}
}

Expand Down
10 changes: 3 additions & 7 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,13 +723,9 @@ func (s *Service) lateBlockTasks(ctx context.Context) {
attribute := s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:])
// return early if we are not proposing next slot
if attribute.IsEmpty() {
fcuArgs := &fcuConfig{
headState: headState,
headRoot: headRoot,
headBlock: nil,
attributes: attribute,
}
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), fcuArgs)
// notifyForkchoiceUpdate fires the payload attribute event. But in this case, we won't
// call notifyForkchoiceUpdate, so the event is fired here.
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1)
return
}

Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/rpc/eth/events/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ go_library(
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/payload-attribute:go_default_library",
Expand Down Expand Up @@ -58,6 +59,7 @@ go_test(
"//consensus-types/interfaces:go_default_library",
"//consensus-types/payload-attribute:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/require:go_default_library",
Expand Down
140 changes: 85 additions & 55 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"runtime/debug"
"strconv"
"time"

Expand All @@ -20,7 +21,8 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
chaintime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute"
Expand Down Expand Up @@ -352,9 +354,18 @@ func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyRea
if r := recover(); r != nil {
log.WithField("panic", r).Error("Recovered from panic while writing event to client.")
err = errWriterUnusable
debug.PrintStack()
}
}()
if lr == nil {
log.Warn("Event stream skipping a nil lazy event reader callback")
return nil
}
r := lr()
if r == nil {
log.Warn("Event stream skipping a nil event reader")
return nil
}
out, err := io.ReadAll(r)
if err != nil {
return err
Expand Down Expand Up @@ -600,62 +611,45 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi

var errUnsupportedPayloadAttribute = errors.New("cannot compute payload attributes pre-Bellatrix")

func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.Attributer, error) {
v := ev.HeadState.Version()
func (s *Server) computePayloadAttributes(ctx context.Context, st state.ReadOnlyBeaconState, root [32]byte, proposer primitives.ValidatorIndex, timestamp uint64, randao []byte) (payloadattribute.Attributer, error) {
v := st.Version()
if v < version.Bellatrix {
return nil, errors.Wrapf(errUnsupportedPayloadAttribute, "%s is not supported", version.String(v))
}

t, err := slots.ToTime(ev.HeadState.GenesisTime(), ev.HeadState.Slot())
if err != nil {
return nil, errors.Wrap(err, "could not get head state slot time")
}
timestamp := uint64(t.Unix())
prevRando, err := helpers.RandaoMix(ev.HeadState, chaintime.CurrentEpoch(ev.HeadState))
if err != nil {
return nil, errors.Wrap(err, "could not get head state randao mix")
}
proposerIndex, err := helpers.BeaconProposerIndex(ctx, ev.HeadState)
if err != nil {
return nil, errors.Wrap(err, "could not get head state proposer index")
}
feeRecpt := params.BeaconConfig().DefaultFeeRecipient.Bytes()
tValidator, exists := s.TrackedValidatorsCache.Validator(proposerIndex)
tValidator, exists := s.TrackedValidatorsCache.Validator(proposer)
if exists {
feeRecpt = tValidator.FeeRecipient[:]
}

if v == version.Bellatrix {
return payloadattribute.New(&engine.PayloadAttributes{
Timestamp: timestamp,
PrevRandao: prevRando,
PrevRandao: randao,
SuggestedFeeRecipient: feeRecpt,
})
}

w, _, err := ev.HeadState.ExpectedWithdrawals()
w, _, err := st.ExpectedWithdrawals()
if err != nil {
return nil, errors.Wrap(err, "could not get withdrawals from head state")
}
if v == version.Capella {
return payloadattribute.New(&engine.PayloadAttributesV2{
Timestamp: timestamp,
PrevRandao: prevRando,
PrevRandao: randao,
SuggestedFeeRecipient: feeRecpt,
Withdrawals: w,
})
}

pr, err := ev.HeadBlock.Block().HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not compute head block root")
}
return payloadattribute.New(&engine.PayloadAttributesV3{
Timestamp: timestamp,
PrevRandao: prevRando,
PrevRandao: randao,
SuggestedFeeRecipient: feeRecpt,
Withdrawals: w,
ParentBeaconBlockRoot: pr[:],
ParentBeaconBlockRoot: root[:],
})
}

Expand All @@ -665,37 +659,75 @@ type asyncPayloadAttrData struct {
err error
}

var zeroRoot [32]byte

// needsFill allows tests to provide filled EventData values. An ordinary event data value fired by the blockchain package will have
// all of the checked fields empty, so the logical short circuit should hit immediately.
func needsFill(ev payloadattribute.EventData) bool {
return ev.HeadState == nil || ev.HeadState.IsNil() || ev.HeadState.LatestBlockHeader() == nil ||
ev.HeadBlock == nil || ev.HeadBlock.IsNil() ||
ev.HeadRoot == zeroRoot || len(ev.ParentBlockRoot) == 0 || len(ev.ParentBlockHash) == 0 ||
ev.Attributer == nil || ev.Attributer.IsEmpty()
}

func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.EventData, error) {
if ev.HeadBlock == nil || ev.HeadBlock.IsNil() {
hb, err := s.HeadFetcher.HeadBlock(ctx)
if err != nil {
return ev, errors.Wrap(err, "Could not look up head block")
}
root, err := hb.Block().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "Could not compute head block root")
}
if ev.HeadRoot != root {
return ev, errors.Wrap(err, "head root changed before payload attribute event handler execution")
}
ev.HeadBlock = hb
payload, err := hb.Block().Body().Execution()
if err != nil {
return ev, errors.Wrap(err, "Could not get execution payload for head block")
}
ev.ParentBlockHash = payload.BlockHash()
ev.ParentBlockNumber = payload.BlockNumber()
var err error

if !needsFill(ev) {
return ev, nil
}

attr := ev.Attributer
if attr == nil || attr.IsEmpty() {
attr, err := s.computePayloadAttributes(ctx, ev)
ev.HeadState, err = s.HeadFetcher.HeadState(ctx)
if err != nil {
return ev, errors.Wrap(err, "could not get head state")
}

ev.HeadBlock, err = s.HeadFetcher.HeadBlock(ctx)
if err != nil {
return ev, errors.Wrap(err, "could not look up head block")
}
ev.HeadRoot, err = ev.HeadBlock.Block().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "could not compute head block root")
}
pr := ev.HeadBlock.Block().ParentRoot()
ev.ParentBlockRoot = pr[:]

hsr, err := ev.HeadState.LatestBlockHeader().HashTreeRoot()
if err != nil {
return ev, errors.Wrap(err, "could not compute latest block header root")
}

pse := slots.ToEpoch(ev.ProposalSlot)
st := ev.HeadState
if slots.ToEpoch(st.Slot()) != pse {
st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, hsr[:], ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "Could not compute payload attributes")
return ev, errors.Wrap(err, "could not run process blocks on head state into the proposal slot epoch")
}
ev.Attributer = attr
}
return ev, nil
ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "failed to compute proposer index")
}
randao, err := helpers.RandaoMix(st, pse)
if err != nil {
return ev, errors.Wrap(err, "could not get head state randado")
}

payload, err := ev.HeadBlock.Block().Body().Execution()
if err != nil {
return ev, errors.Wrap(err, "could not get execution payload for head block")
}
ev.ParentBlockHash = payload.BlockHash()
ev.ParentBlockNumber = payload.BlockNumber()

t, err := slots.ToTime(st.GenesisTime(), ev.ProposalSlot)
if err != nil {
return ev, errors.Wrap(err, "could not get head state slot time")
}
ev.Attributer, err = s.computePayloadAttributes(ctx, st, hsr, ev.ProposerIndex, uint64(t.Unix()), randao)
return ev, err
}

// This event stream is intended to be used by builders and relays.
Expand All @@ -704,10 +736,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut
ctx, cancel := context.WithTimeout(ctx, payloadAttributeTimeout)
edc := make(chan asyncPayloadAttrData)
go func() {
d := asyncPayloadAttrData{
version: version.String(ev.HeadState.Version()),
}

d := asyncPayloadAttrData{}
defer func() {
edc <- d
}()
Expand All @@ -716,6 +745,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut
d.err = errors.Wrap(err, "Could not fill event data")
return
}
d.version = version.String(ev.HeadBlock.Version())
attributesBytes, err := marshalAttributes(ev.Attributer)
if err != nil {
d.err = errors.Wrap(err, "errors marshaling payload attributes to json")
Expand Down
Loading
Loading