From 7d67255a97ae5ef38d830ad7930e7e5b90f20bf8 Mon Sep 17 00:00:00 2001 From: Kasey Kirkham Date: Wed, 19 Feb 2025 15:47:31 -0600 Subject: [PATCH 01/10] payload attribute computations in event handler --- beacon-chain/blockchain/execution_engine.go | 31 +---- beacon-chain/blockchain/process_block.go | 10 +- beacon-chain/rpc/eth/events/BUILD.bazel | 3 +- beacon-chain/rpc/eth/events/events.go | 135 ++++++++++++-------- changelog/kasey_event-slot-fix.md | 2 + 5 files changed, 89 insertions(+), 92 deletions(-) create mode 100644 changelog/kasey_event-slot-fix.md diff --git a/beacon-chain/blockchain/execution_engine.go b/beacon-chain/blockchain/execution_engine.go index 45e29176ec36..2a54be1c7924 100644 --- a/beacon-chain/blockchain/execution_engine.go +++ b/beacon-chain/blockchain/execution_engine.go @@ -72,7 +72,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* if arg.attributes == nil { arg.attributes = payloadattribute.EmptyWithVersion(headBlk.Version()) } - go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), arg) + go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1) payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, arg.attributes) if err != nil { switch { @@ -171,35 +171,10 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* return payloadID, nil } -func firePayloadAttributesEvent(ctx context.Context, f event.SubscriberSender, cfg *fcuConfig) { - pidx, err := helpers.BeaconProposerIndex(ctx, cfg.headState) - if err != nil { - log.WithError(err). - WithField("head_root", cfg.headRoot[:]). - Error("Could not get proposer index for PayloadAttributes event") - return - } - evd := payloadattribute.EventData{ - ProposerIndex: pidx, - ProposalSlot: cfg.headState.Slot(), - ParentBlockRoot: cfg.headRoot[:], - Attributer: cfg.attributes, - HeadRoot: cfg.headRoot, - HeadState: cfg.headState, - HeadBlock: cfg.headBlock, - } - if cfg.headBlock != nil && !cfg.headBlock.IsNil() { - headPayload, err := cfg.headBlock.Block().Body().Execution() - if err != nil { - log.WithError(err).Error("Could not get execution payload for head block") - return - } - evd.ParentBlockHash = headPayload.BlockHash() - evd.ParentBlockNumber = headPayload.BlockNumber() - } +func firePayloadAttributesEvent(ctx context.Context, f event.SubscriberSender, slot primitives.Slot) { f.Send(&feed.Event{ Type: statefeed.PayloadAttributes, - Data: evd, + Data: payloadattribute.EventData{ProposalSlot: slot}, }) } diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 07a49b6e69ea..a799ce52cee0 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -723,13 +723,9 @@ func (s *Service) lateBlockTasks(ctx context.Context) { attribute := s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:]) // return early if we are not proposing next slot if attribute.IsEmpty() { - fcuArgs := &fcuConfig{ - headState: headState, - headRoot: headRoot, - headBlock: nil, - attributes: attribute, - } - go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), fcuArgs) + // notifyForkchoiceUpdate fires the payload attribute event. But in this case, we won't + // call notifyForkchoiceUpdate, so the event is fired here. + go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1) return } diff --git a/beacon-chain/rpc/eth/events/BUILD.bazel b/beacon-chain/rpc/eth/events/BUILD.bazel index 9bbea34e1c06..0a633a87f500 100644 --- a/beacon-chain/rpc/eth/events/BUILD.bazel +++ b/beacon-chain/rpc/eth/events/BUILD.bazel @@ -18,7 +18,8 @@ go_library( "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", - "//beacon-chain/core/time:go_default_library", + "//beacon-chain/core/transition:go_default_library", + "//beacon-chain/state:go_default_library", "//config/params:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/payload-attribute:go_default_library", diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index b1ba8e2f1702..54a483c093b6 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "runtime/debug" "strconv" "time" @@ -20,7 +21,8 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" - chaintime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute" @@ -352,6 +354,7 @@ func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyRea if r := recover(); r != nil { log.WithField("panic", r).Error("Recovered from panic while writing event to client.") err = errWriterUnusable + debug.PrintStack() } }() r := lr() @@ -600,27 +603,14 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi var errUnsupportedPayloadAttribute = errors.New("cannot compute payload attributes pre-Bellatrix") -func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.Attributer, error) { - v := ev.HeadState.Version() +func (s *Server) computePayloadAttributes(ctx context.Context, st state.ReadOnlyBeaconState, root [32]byte, proposer primitives.ValidatorIndex, timestamp uint64, randao []byte) (payloadattribute.Attributer, error) { + v := st.Version() if v < version.Bellatrix { return nil, errors.Wrapf(errUnsupportedPayloadAttribute, "%s is not supported", version.String(v)) } - t, err := slots.ToTime(ev.HeadState.GenesisTime(), ev.HeadState.Slot()) - if err != nil { - return nil, errors.Wrap(err, "could not get head state slot time") - } - timestamp := uint64(t.Unix()) - prevRando, err := helpers.RandaoMix(ev.HeadState, chaintime.CurrentEpoch(ev.HeadState)) - if err != nil { - return nil, errors.Wrap(err, "could not get head state randao mix") - } - proposerIndex, err := helpers.BeaconProposerIndex(ctx, ev.HeadState) - if err != nil { - return nil, errors.Wrap(err, "could not get head state proposer index") - } feeRecpt := params.BeaconConfig().DefaultFeeRecipient.Bytes() - tValidator, exists := s.TrackedValidatorsCache.Validator(proposerIndex) + tValidator, exists := s.TrackedValidatorsCache.Validator(proposer) if exists { feeRecpt = tValidator.FeeRecipient[:] } @@ -628,34 +618,30 @@ func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribu if v == version.Bellatrix { return payloadattribute.New(&engine.PayloadAttributes{ Timestamp: timestamp, - PrevRandao: prevRando, + PrevRandao: randao, SuggestedFeeRecipient: feeRecpt, }) } - w, _, err := ev.HeadState.ExpectedWithdrawals() + w, _, err := st.ExpectedWithdrawals() if err != nil { return nil, errors.Wrap(err, "could not get withdrawals from head state") } if v == version.Capella { return payloadattribute.New(&engine.PayloadAttributesV2{ Timestamp: timestamp, - PrevRandao: prevRando, + PrevRandao: randao, SuggestedFeeRecipient: feeRecpt, Withdrawals: w, }) } - pr, err := ev.HeadBlock.Block().HashTreeRoot() - if err != nil { - return nil, errors.Wrap(err, "could not compute head block root") - } return payloadattribute.New(&engine.PayloadAttributesV3{ Timestamp: timestamp, - PrevRandao: prevRando, + PrevRandao: randao, SuggestedFeeRecipient: feeRecpt, Withdrawals: w, - ParentBeaconBlockRoot: pr[:], + ParentBeaconBlockRoot: root[:], }) } @@ -665,37 +651,76 @@ type asyncPayloadAttrData struct { err error } +var zeroRoot [32]byte + +// needsFill allows tests to provide filled EventData values. An ordinary event data value fired by the blockchain package will have +// all of the checked fields empty, so the logical short circuit should hit immediately. +func needsFill(ev payloadattribute.EventData) bool { + return ev.HeadState == nil || ev.HeadState.IsNil() || + ev.HeadState.LatestBlockHeader() == nil || ev.HeadState.LatestBlockHeader() == nil || + ev.HeadBlock == nil || ev.HeadBlock.IsNil() || + ev.HeadRoot == zeroRoot || len(ev.ParentBlockRoot) == 0 || len(ev.ParentBlockHash) == 0 || + ev.Attributer == nil || ev.Attributer.IsEmpty() +} + func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.EventData, error) { - if ev.HeadBlock == nil || ev.HeadBlock.IsNil() { - hb, err := s.HeadFetcher.HeadBlock(ctx) - if err != nil { - return ev, errors.Wrap(err, "Could not look up head block") - } - root, err := hb.Block().HashTreeRoot() - if err != nil { - return ev, errors.Wrap(err, "Could not compute head block root") - } - if ev.HeadRoot != root { - return ev, errors.Wrap(err, "head root changed before payload attribute event handler execution") - } - ev.HeadBlock = hb - payload, err := hb.Block().Body().Execution() - if err != nil { - return ev, errors.Wrap(err, "Could not get execution payload for head block") + var err error + /* + if !needsFill(ev) { + return ev, nil } - ev.ParentBlockHash = payload.BlockHash() - ev.ParentBlockNumber = payload.BlockNumber() + */ + ev.HeadState, err = s.HeadFetcher.HeadState(ctx) + if err != nil { + return ev, errors.Wrap(err, "Could not get head state") } - attr := ev.Attributer - if attr == nil || attr.IsEmpty() { - attr, err := s.computePayloadAttributes(ctx, ev) + ev.HeadBlock, err = s.HeadFetcher.HeadBlock(ctx) + if err != nil { + return ev, errors.Wrap(err, "Could not look up head block") + } + ev.HeadRoot, err = ev.HeadBlock.Block().HashTreeRoot() + if err != nil { + return ev, errors.Wrap(err, "Could not compute head block root") + } + pr := ev.HeadBlock.Block().ParentRoot() + ev.ParentBlockRoot = pr[:] + + hsr, err := ev.HeadState.LatestBlockHeader().HashTreeRoot() + if err != nil { + return ev, errors.Wrap(err, "Could not compute latest block header root") + } + + pse := slots.ToEpoch(ev.ProposalSlot) + st := ev.HeadState + if slots.ToEpoch(st.Slot()) != pse { + st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, hsr[:], ev.ProposalSlot) if err != nil { - return ev, errors.Wrap(err, "Could not compute payload attributes") + return ev, errors.Wrap(err, "Could not run process blocks on head state into the proposal slot epoch") } - ev.Attributer = attr } - return ev, nil + ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot) + if err != nil { + return ev, errors.Wrap(err, "failed to compute proposer index") + } + randao, err := helpers.RandaoMix(st, pse) + if err != nil { + return ev, errors.Wrap(err, "could not get head state randado") + } + + payload, err := ev.HeadBlock.Block().Body().Execution() + if err != nil { + return ev, errors.Wrap(err, "Could not get execution payload for head block") + } + ev.ParentBlockHash = payload.BlockHash() + ev.ParentBlockNumber = payload.BlockNumber() + + t, err := slots.ToTime(st.GenesisTime(), ev.ProposalSlot) + if err != nil { + return ev, errors.Wrap(err, "could not get head state slot time") + } + ev.Attributer, err = s.computePayloadAttributes(ctx, st, hsr, ev.ProposerIndex, uint64(t.Unix()), randao) + return ev, err } // This event stream is intended to be used by builders and relays. @@ -704,10 +729,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut ctx, cancel := context.WithTimeout(ctx, payloadAttributeTimeout) edc := make(chan asyncPayloadAttrData) go func() { - d := asyncPayloadAttrData{ - version: version.String(ev.HeadState.Version()), - } - + d := asyncPayloadAttrData{} defer func() { edc <- d }() @@ -716,6 +738,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut d.err = errors.Wrap(err, "Could not fill event data") return } + d.version = version.String(ev.HeadBlock.Version()) attributesBytes, err := marshalAttributes(ev.Attributer) if err != nil { d.err = errors.Wrap(err, "errors marshaling payload attributes to json") @@ -738,11 +761,11 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut select { case <-ctx.Done(): log.WithError(ctx.Err()).Warn("Context canceled while waiting for payload attributes event data") - return nil + return newlineReader() case ed := <-edc: if ed.err != nil { log.WithError(ed.err).Warn("Error while marshaling payload attributes event data") - return nil + return newlineReader() } return jsonMarshalReader(PayloadAttributesTopic, &structs.PayloadAttributesEvent{ Version: ed.version, diff --git a/changelog/kasey_event-slot-fix.md b/changelog/kasey_event-slot-fix.md new file mode 100644 index 000000000000..ad5ed0cb6e19 --- /dev/null +++ b/changelog/kasey_event-slot-fix.md @@ -0,0 +1,2 @@ +### Fixed +- Fixed a bug in the event stream handler when processing payload attribute events where the timestamp and slot of the event would be based on the head rather than the current slot. From 8a0832f763438d3d591bc8113f757612fd3102f9 Mon Sep 17 00:00:00 2001 From: Kasey Kirkham Date: Mon, 24 Feb 2025 15:52:25 -0600 Subject: [PATCH 02/10] Skip executing nil lazyReaders --- beacon-chain/rpc/eth/events/events.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 54a483c093b6..992784b20859 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -357,6 +357,10 @@ func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyRea debug.PrintStack() } }() + if lr == nil { + log.Warn("event stream skipping a nil lazy event reader") + return nil + } r := lr() out, err := io.ReadAll(r) if err != nil { From ab12d039d0b537fb3d95050e8d7564c492f70f9c Mon Sep 17 00:00:00 2001 From: james-prysm Date: Mon, 3 Mar 2025 13:36:29 -0600 Subject: [PATCH 03/10] adding in clarifying comments, uncommenting needs fill, adding in happy path unit test for code coverage --- beacon-chain/blockchain/execution_engine.go | 13 ++- beacon-chain/rpc/eth/events/events.go | 10 +- beacon-chain/rpc/eth/events/events_test.go | 104 ++++++++++++++++++++ 3 files changed, 118 insertions(+), 9 deletions(-) diff --git a/beacon-chain/blockchain/execution_engine.go b/beacon-chain/blockchain/execution_engine.go index 2a54be1c7924..5b286ed0d34b 100644 --- a/beacon-chain/blockchain/execution_engine.go +++ b/beacon-chain/blockchain/execution_engine.go @@ -72,7 +72,8 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* if arg.attributes == nil { arg.attributes = payloadattribute.EmptyWithVersion(headBlk.Version()) } - go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1) + nextSlot := s.CurrentSlot() + 1 + go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), nextSlot) payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, arg.attributes) if err != nil { switch { @@ -152,13 +153,13 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* } // If the forkchoice update call has an attribute, update the payload ID cache. hasAttr := arg.attributes != nil && !arg.attributes.IsEmpty() - nextSlot := s.CurrentSlot() + 1 if hasAttr && payloadID != nil { var pId [8]byte copy(pId[:], payloadID[:]) log.WithFields(logrus.Fields{ "blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(arg.headRoot[:])), "headSlot": headBlk.Slot(), + "nextSlot": nextSlot, "payloadID": fmt.Sprintf("%#x", bytesutil.Trunc(payloadID[:])), }).Info("Forkchoice updated with payload attributes for proposal") s.cfg.PayloadIDCache.Set(nextSlot, arg.headRoot, pId) @@ -166,15 +167,19 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* log.WithFields(logrus.Fields{ "blockHash": fmt.Sprintf("%#x", headPayload.BlockHash()), "slot": headBlk.Slot(), + "nextSlot": nextSlot, }).Error("Received nil payload ID on VALID engine response") } return payloadID, nil } -func firePayloadAttributesEvent(ctx context.Context, f event.SubscriberSender, slot primitives.Slot) { +func firePayloadAttributesEvent(_ context.Context, f event.SubscriberSender, nextSlot primitives.Slot) { + // the fcu args have differing amounts of completeness based on the code path, + // and there is work we only want to do if a client is actually listening to the events beacon api endpoint. + // temporary solution: just fire a blank event and fill in the details in the api handler. f.Send(&feed.Event{ Type: statefeed.PayloadAttributes, - Data: payloadattribute.EventData{ProposalSlot: slot}, + Data: payloadattribute.EventData{ProposalSlot: nextSlot}, }) } diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 992784b20859..a551b0ba7361 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -669,11 +669,11 @@ func needsFill(ev payloadattribute.EventData) bool { func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.EventData, error) { var err error - /* - if !needsFill(ev) { - return ev, nil - } - */ + + if !needsFill(ev) { + return ev, nil + } + ev.HeadState, err = s.HeadFetcher.HeadState(ctx) if err != nil { return ev, errors.Wrap(err, "Could not get head state") diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index 82c970839449..15fdd6d5fe6f 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -2,6 +2,7 @@ package events import ( "context" + "encoding/binary" "fmt" "io" "math" @@ -24,6 +25,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/v5/proto/eth/v1" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/require" @@ -557,6 +559,108 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { }) } +func TestFillEventData(t *testing.T) { + ctx := context.Background() + t.Run("AlreadyFilledData_ShouldShortCircuitWithoutError", func(t *testing.T) { + st, err := util.NewBeaconStateBellatrix() + require.NoError(t, err) + b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockBellatrix(ð.SignedBeaconBlockBellatrix{})) + require.NoError(t, err) + attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{ + Timestamp: uint64(time.Now().Unix()), + }) + alreadyFilled := payloadattribute.EventData{ + HeadState: st, + HeadBlock: b, + HeadRoot: [32]byte{1, 2, 3}, + Attributer: attributor, + ParentBlockRoot: []byte{1, 2, 3}, + ParentBlockHash: []byte{4, 5, 6}, + } + srv := &Server{} // No real HeadFetcher needed here since it won't be called. + result, err := srv.fillEventData(ctx, alreadyFilled) + require.NoError(t, err) + require.DeepEqual(t, alreadyFilled, result) + }) + t.Run("Electra PartialData_ShouldFetchHeadStateAndBlock", func(t *testing.T) { + st, err := util.NewBeaconStateElectra() + require.NoError(t, err) + valCount := 10 + setActiveValidators(t, st, valCount) + inactivityScores := make([]uint64, valCount) + for i := range inactivityScores { + inactivityScores[i] = 10 + } + require.NoError(t, st.SetInactivityScores(inactivityScores)) + b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockElectra(ð.SignedBeaconBlockElectra{})) + require.NoError(t, err) + attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{ + Timestamp: uint64(time.Now().Unix()), + }) + // Create an event data object missing certain fields: + partial := payloadattribute.EventData{ + // The presence of a nil HeadState, nil HeadBlock, zeroed HeadRoot, etc. + // will cause fillEventData to try to fill the values. + ProposalSlot: 42, // different epoch from current slot + Attributer: attributor, // Must be Bellatrix or later + } + currentSlot := primitives.Slot(0) + // to avoid slot processing + require.NoError(t, st.SetSlot(currentSlot+1)) + mockChainService := &mockChain.ChainService{ + Root: make([]byte, 32), + State: st, + Block: b, + Slot: ¤tSlot, + } + + stn := mockChain.NewEventFeedWrapper() + opn := mockChain.NewEventFeedWrapper() + srv := &Server{ + StateNotifier: &mockChain.SimpleNotifier{Feed: stn}, + OperationNotifier: &mockChain.SimpleNotifier{Feed: opn}, + HeadFetcher: mockChainService, + ChainInfoFetcher: mockChainService, + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), + EventWriteTimeout: testEventWriteTimeout, + } + + filled, err := srv.fillEventData(ctx, partial) + require.NoError(t, err, "expected successful fill of partial event data") + + // Verify that fields have been updated from the mock data: + require.NotNil(t, filled.HeadState, "HeadState should be assigned") + require.NotNil(t, filled.HeadBlock, "HeadBlock should be assigned") + require.NotEqual(t, [32]byte{}, filled.HeadRoot, "HeadRoot should no longer be zero") + require.NotEmpty(t, filled.ParentBlockRoot, "ParentBlockRoot should be filled") + require.NotEmpty(t, filled.ParentBlockHash, "ParentBlockHash should be filled") + require.Equal(t, uint64(0), filled.ParentBlockNumber, "ParentBlockNumber must match mock block") + + // Check that a valid Attributer was set: + require.NotNil(t, filled.Attributer, "Should have a valid payload attributes object") + require.Equal(t, false, filled.Attributer.IsEmpty(), "Attributer should not be empty after fill") + }) +} + +func setActiveValidators(t *testing.T, st state.BeaconState, count int) { + balances := make([]uint64, count) + validators := make([]*eth.Validator, 0, count) + for i := 0; i < count; i++ { + pubKey := make([]byte, params.BeaconConfig().BLSPubkeyLength) + binary.LittleEndian.PutUint64(pubKey, uint64(i)) + balances[i] = uint64(i) + validators = append(validators, ð.Validator{ + PublicKey: pubKey, + ActivationEpoch: 0, + ExitEpoch: params.BeaconConfig().FarFutureEpoch, + WithdrawalCredentials: make([]byte, 32), + }) + } + + require.NoError(t, st.SetValidators(validators)) + require.NoError(t, st.SetBalances(balances)) +} + func TestStuckReaderScenarios(t *testing.T) { cases := []struct { name string From ae18401a4cd6b79a7a1cbb2a524056f13be28517 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Mon, 3 Mar 2025 13:42:24 -0600 Subject: [PATCH 04/10] gaz --- beacon-chain/rpc/eth/events/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/beacon-chain/rpc/eth/events/BUILD.bazel b/beacon-chain/rpc/eth/events/BUILD.bazel index 0a633a87f500..731d70ec272b 100644 --- a/beacon-chain/rpc/eth/events/BUILD.bazel +++ b/beacon-chain/rpc/eth/events/BUILD.bazel @@ -59,6 +59,7 @@ go_test( "//consensus-types/interfaces:go_default_library", "//consensus-types/payload-attribute:go_default_library", "//consensus-types/primitives:go_default_library", + "//proto/engine/v1:go_default_library", "//proto/eth/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//testing/require:go_default_library", From cae27ad592426640613befd221377ebf3e7dd977 Mon Sep 17 00:00:00 2001 From: james-prysm Date: Mon, 3 Mar 2025 14:00:53 -0600 Subject: [PATCH 05/10] fixing ineffectual assignment --- beacon-chain/rpc/eth/events/events_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index 15fdd6d5fe6f..45aa49ee0b6e 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -569,6 +569,7 @@ func TestFillEventData(t *testing.T) { attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{ Timestamp: uint64(time.Now().Unix()), }) + require.NoError(t, err) alreadyFilled := payloadattribute.EventData{ HeadState: st, HeadBlock: b, @@ -597,6 +598,7 @@ func TestFillEventData(t *testing.T) { attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{ Timestamp: uint64(time.Now().Unix()), }) + require.NoError(t, err) // Create an event data object missing certain fields: partial := payloadattribute.EventData{ // The presence of a nil HeadState, nil HeadBlock, zeroed HeadRoot, etc. From 207b2fd5484c725f0fb7903540d7ba7d5e514203 Mon Sep 17 00:00:00 2001 From: Kasey Kirkham Date: Mon, 3 Mar 2025 14:56:01 -0600 Subject: [PATCH 06/10] nil check the Reader coming from the lazyReader --- beacon-chain/rpc/eth/events/events.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index a551b0ba7361..2ced7615682a 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -358,10 +358,14 @@ func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyRea } }() if lr == nil { - log.Warn("event stream skipping a nil lazy event reader") + log.Warn("event stream skipping a nil lazy event reader callback") return nil } r := lr() + if r == nil { + log.Warn("event stream skipping a nil event reader") + return nil + } out, err := io.ReadAll(r) if err != nil { return err @@ -765,11 +769,11 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut select { case <-ctx.Done(): log.WithError(ctx.Err()).Warn("Context canceled while waiting for payload attributes event data") - return newlineReader() + return nil case ed := <-edc: if ed.err != nil { log.WithError(ed.err).Warn("Error while marshaling payload attributes event data") - return newlineReader() + return nil } return jsonMarshalReader(PayloadAttributesTopic, &structs.PayloadAttributesEvent{ Version: ed.version, From 199387284e6006db0ee0bc2ca9a1bd6a82658c2b Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Wed, 5 Mar 2025 15:20:42 -0600 Subject: [PATCH 07/10] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Radek's PR suggestion to fix error/log capitalization. Co-authored-by: RadosÅ‚aw Kapka --- beacon-chain/rpc/eth/events/events.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 2ced7615682a..c69840a9bdba 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -358,12 +358,12 @@ func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyRea } }() if lr == nil { - log.Warn("event stream skipping a nil lazy event reader callback") + log.Warn("Event stream skipping a nil lazy event reader callback") return nil } r := lr() if r == nil { - log.Warn("event stream skipping a nil event reader") + log.Warn("Event stream skipping a nil event reader") return nil } out, err := io.ReadAll(r) @@ -680,23 +680,23 @@ func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventDat ev.HeadState, err = s.HeadFetcher.HeadState(ctx) if err != nil { - return ev, errors.Wrap(err, "Could not get head state") + return ev, errors.Wrap(err, "could not get head state") } ev.HeadBlock, err = s.HeadFetcher.HeadBlock(ctx) if err != nil { - return ev, errors.Wrap(err, "Could not look up head block") + return ev, errors.Wrap(err, "could not look up head block") } ev.HeadRoot, err = ev.HeadBlock.Block().HashTreeRoot() if err != nil { - return ev, errors.Wrap(err, "Could not compute head block root") + return ev, errors.Wrap(err, "could not compute head block root") } pr := ev.HeadBlock.Block().ParentRoot() ev.ParentBlockRoot = pr[:] hsr, err := ev.HeadState.LatestBlockHeader().HashTreeRoot() if err != nil { - return ev, errors.Wrap(err, "Could not compute latest block header root") + return ev, errors.Wrap(err, "could not compute latest block header root") } pse := slots.ToEpoch(ev.ProposalSlot) @@ -704,7 +704,7 @@ func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventDat if slots.ToEpoch(st.Slot()) != pse { st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, hsr[:], ev.ProposalSlot) if err != nil { - return ev, errors.Wrap(err, "Could not run process blocks on head state into the proposal slot epoch") + return ev, errors.Wrap(err, "could not run process blocks on head state into the proposal slot epoch") } } ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot) @@ -718,7 +718,7 @@ func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventDat payload, err := ev.HeadBlock.Block().Body().Execution() if err != nil { - return ev, errors.Wrap(err, "Could not get execution payload for head block") + return ev, errors.Wrap(err, "could not get execution payload for head block") } ev.ParentBlockHash = payload.BlockHash() ev.ParentBlockNumber = payload.BlockNumber() From 1e2ca5c303c073fe3ae6b5eee4585f77a3693d14 Mon Sep 17 00:00:00 2001 From: Kasey Kirkham Date: Wed, 5 Mar 2025 15:27:19 -0600 Subject: [PATCH 08/10] Radek feedback --- beacon-chain/rpc/eth/events/events.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index c69840a9bdba..1bb595b8da6c 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -664,8 +664,7 @@ var zeroRoot [32]byte // needsFill allows tests to provide filled EventData values. An ordinary event data value fired by the blockchain package will have // all of the checked fields empty, so the logical short circuit should hit immediately. func needsFill(ev payloadattribute.EventData) bool { - return ev.HeadState == nil || ev.HeadState.IsNil() || - ev.HeadState.LatestBlockHeader() == nil || ev.HeadState.LatestBlockHeader() == nil || + return ev.HeadState == nil || ev.HeadState.IsNil() || ev.HeadState.LatestBlockHeader() == nil || ev.HeadBlock == nil || ev.HeadBlock.IsNil() || ev.HeadRoot == zeroRoot || len(ev.ParentBlockRoot) == 0 || len(ev.ParentBlockHash) == 0 || ev.Attributer == nil || ev.Attributer.IsEmpty() From 92e139c44c56166c96f30d5899cdfbac022ef902 Mon Sep 17 00:00:00 2001 From: terence tsao Date: Thu, 6 Mar 2025 13:23:39 -0800 Subject: [PATCH 09/10] Move fire payload attribute event to after save head --- beacon-chain/blockchain/execution_engine.go | 3 +-- beacon-chain/blockchain/forkchoice_update_execution.go | 2 ++ 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/beacon-chain/blockchain/execution_engine.go b/beacon-chain/blockchain/execution_engine.go index 5b286ed0d34b..d781feb33992 100644 --- a/beacon-chain/blockchain/execution_engine.go +++ b/beacon-chain/blockchain/execution_engine.go @@ -72,8 +72,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* if arg.attributes == nil { arg.attributes = payloadattribute.EmptyWithVersion(headBlk.Version()) } - nextSlot := s.CurrentSlot() + 1 - go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), nextSlot) payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, arg.attributes) if err != nil { switch { @@ -153,6 +151,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* } // If the forkchoice update call has an attribute, update the payload ID cache. hasAttr := arg.attributes != nil && !arg.attributes.IsEmpty() + nextSlot := s.CurrentSlot() + 1 if hasAttr && payloadID != nil { var pId [8]byte copy(pId[:], payloadID[:]) diff --git a/beacon-chain/blockchain/forkchoice_update_execution.go b/beacon-chain/blockchain/forkchoice_update_execution.go index ed749d52c042..4348bfd3ca77 100644 --- a/beacon-chain/blockchain/forkchoice_update_execution.go +++ b/beacon-chain/blockchain/forkchoice_update_execution.go @@ -102,6 +102,8 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuCo log.WithError(err).Error("could not save head") } + go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1) + // Only need to prune attestations from pool if the head has changed. if err := s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock); err != nil { log.WithError(err).Error("could not prune attestations from pool") From d430e296564d72d55e225fb7f1c2027dc9d810af Mon Sep 17 00:00:00 2001 From: Kasey Kirkham Date: Fri, 7 Mar 2025 14:14:11 -0600 Subject: [PATCH 10/10] set mock statenotifier in testServiceOptsWithDB --- beacon-chain/blockchain/mock_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/beacon-chain/blockchain/mock_test.go b/beacon-chain/blockchain/mock_test.go index d77e84e980c1..bbe473ca6811 100644 --- a/beacon-chain/blockchain/mock_test.go +++ b/beacon-chain/blockchain/mock_test.go @@ -3,6 +3,7 @@ package blockchain import ( "testing" + mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" testDB "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree" "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" @@ -18,6 +19,7 @@ func testServiceOptsWithDB(t *testing.T) []Option { WithStateGen(stategen.New(beaconDB, fcs)), WithForkChoiceStore(fcs), WithClockSynchronizer(cs), + WithStateNotifier(&mock.MockStateNotifier{RecordEvents: true}), } }