diff --git a/beacon-chain/blockchain/execution_engine.go b/beacon-chain/blockchain/execution_engine.go index 45e29176ec36..d781feb33992 100644 --- a/beacon-chain/blockchain/execution_engine.go +++ b/beacon-chain/blockchain/execution_engine.go @@ -72,7 +72,6 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* if arg.attributes == nil { arg.attributes = payloadattribute.EmptyWithVersion(headBlk.Version()) } - go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), arg) payloadID, lastValidHash, err := s.cfg.ExecutionEngineCaller.ForkchoiceUpdated(ctx, fcs, arg.attributes) if err != nil { switch { @@ -159,6 +158,7 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* log.WithFields(logrus.Fields{ "blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(arg.headRoot[:])), "headSlot": headBlk.Slot(), + "nextSlot": nextSlot, "payloadID": fmt.Sprintf("%#x", bytesutil.Trunc(payloadID[:])), }).Info("Forkchoice updated with payload attributes for proposal") s.cfg.PayloadIDCache.Set(nextSlot, arg.headRoot, pId) @@ -166,40 +166,19 @@ func (s *Service) notifyForkchoiceUpdate(ctx context.Context, arg *fcuConfig) (* log.WithFields(logrus.Fields{ "blockHash": fmt.Sprintf("%#x", headPayload.BlockHash()), "slot": headBlk.Slot(), + "nextSlot": nextSlot, }).Error("Received nil payload ID on VALID engine response") } return payloadID, nil } -func firePayloadAttributesEvent(ctx context.Context, f event.SubscriberSender, cfg *fcuConfig) { - pidx, err := helpers.BeaconProposerIndex(ctx, cfg.headState) - if err != nil { - log.WithError(err). - WithField("head_root", cfg.headRoot[:]). - Error("Could not get proposer index for PayloadAttributes event") - return - } - evd := payloadattribute.EventData{ - ProposerIndex: pidx, - ProposalSlot: cfg.headState.Slot(), - ParentBlockRoot: cfg.headRoot[:], - Attributer: cfg.attributes, - HeadRoot: cfg.headRoot, - HeadState: cfg.headState, - HeadBlock: cfg.headBlock, - } - if cfg.headBlock != nil && !cfg.headBlock.IsNil() { - headPayload, err := cfg.headBlock.Block().Body().Execution() - if err != nil { - log.WithError(err).Error("Could not get execution payload for head block") - return - } - evd.ParentBlockHash = headPayload.BlockHash() - evd.ParentBlockNumber = headPayload.BlockNumber() - } +func firePayloadAttributesEvent(_ context.Context, f event.SubscriberSender, nextSlot primitives.Slot) { + // the fcu args have differing amounts of completeness based on the code path, + // and there is work we only want to do if a client is actually listening to the events beacon api endpoint. + // temporary solution: just fire a blank event and fill in the details in the api handler. f.Send(&feed.Event{ Type: statefeed.PayloadAttributes, - Data: evd, + Data: payloadattribute.EventData{ProposalSlot: nextSlot}, }) } diff --git a/beacon-chain/blockchain/forkchoice_update_execution.go b/beacon-chain/blockchain/forkchoice_update_execution.go index ed749d52c042..4348bfd3ca77 100644 --- a/beacon-chain/blockchain/forkchoice_update_execution.go +++ b/beacon-chain/blockchain/forkchoice_update_execution.go @@ -102,6 +102,8 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuCo log.WithError(err).Error("could not save head") } + go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1) + // Only need to prune attestations from pool if the head has changed. if err := s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock); err != nil { log.WithError(err).Error("could not prune attestations from pool") diff --git a/beacon-chain/blockchain/mock_test.go b/beacon-chain/blockchain/mock_test.go index d77e84e980c1..bbe473ca6811 100644 --- a/beacon-chain/blockchain/mock_test.go +++ b/beacon-chain/blockchain/mock_test.go @@ -3,6 +3,7 @@ package blockchain import ( "testing" + mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" testDB "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree" "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" @@ -18,6 +19,7 @@ func testServiceOptsWithDB(t *testing.T) []Option { WithStateGen(stategen.New(beaconDB, fcs)), WithForkChoiceStore(fcs), WithClockSynchronizer(cs), + WithStateNotifier(&mock.MockStateNotifier{RecordEvents: true}), } } diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 07a49b6e69ea..a799ce52cee0 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -723,13 +723,9 @@ func (s *Service) lateBlockTasks(ctx context.Context) { attribute := s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:]) // return early if we are not proposing next slot if attribute.IsEmpty() { - fcuArgs := &fcuConfig{ - headState: headState, - headRoot: headRoot, - headBlock: nil, - attributes: attribute, - } - go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), fcuArgs) + // notifyForkchoiceUpdate fires the payload attribute event. But in this case, we won't + // call notifyForkchoiceUpdate, so the event is fired here. + go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1) return } diff --git a/beacon-chain/rpc/eth/events/BUILD.bazel b/beacon-chain/rpc/eth/events/BUILD.bazel index 9bbea34e1c06..731d70ec272b 100644 --- a/beacon-chain/rpc/eth/events/BUILD.bazel +++ b/beacon-chain/rpc/eth/events/BUILD.bazel @@ -18,7 +18,8 @@ go_library( "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", - "//beacon-chain/core/time:go_default_library", + "//beacon-chain/core/transition:go_default_library", + "//beacon-chain/state:go_default_library", "//config/params:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/payload-attribute:go_default_library", @@ -58,6 +59,7 @@ go_test( "//consensus-types/interfaces:go_default_library", "//consensus-types/payload-attribute:go_default_library", "//consensus-types/primitives:go_default_library", + "//proto/engine/v1:go_default_library", "//proto/eth/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//testing/require:go_default_library", diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index b1ba8e2f1702..1bb595b8da6c 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "runtime/debug" "strconv" "time" @@ -20,7 +21,8 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" - chaintime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute" @@ -352,9 +354,18 @@ func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyRea if r := recover(); r != nil { log.WithField("panic", r).Error("Recovered from panic while writing event to client.") err = errWriterUnusable + debug.PrintStack() } }() + if lr == nil { + log.Warn("Event stream skipping a nil lazy event reader callback") + return nil + } r := lr() + if r == nil { + log.Warn("Event stream skipping a nil event reader") + return nil + } out, err := io.ReadAll(r) if err != nil { return err @@ -600,27 +611,14 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi var errUnsupportedPayloadAttribute = errors.New("cannot compute payload attributes pre-Bellatrix") -func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.Attributer, error) { - v := ev.HeadState.Version() +func (s *Server) computePayloadAttributes(ctx context.Context, st state.ReadOnlyBeaconState, root [32]byte, proposer primitives.ValidatorIndex, timestamp uint64, randao []byte) (payloadattribute.Attributer, error) { + v := st.Version() if v < version.Bellatrix { return nil, errors.Wrapf(errUnsupportedPayloadAttribute, "%s is not supported", version.String(v)) } - t, err := slots.ToTime(ev.HeadState.GenesisTime(), ev.HeadState.Slot()) - if err != nil { - return nil, errors.Wrap(err, "could not get head state slot time") - } - timestamp := uint64(t.Unix()) - prevRando, err := helpers.RandaoMix(ev.HeadState, chaintime.CurrentEpoch(ev.HeadState)) - if err != nil { - return nil, errors.Wrap(err, "could not get head state randao mix") - } - proposerIndex, err := helpers.BeaconProposerIndex(ctx, ev.HeadState) - if err != nil { - return nil, errors.Wrap(err, "could not get head state proposer index") - } feeRecpt := params.BeaconConfig().DefaultFeeRecipient.Bytes() - tValidator, exists := s.TrackedValidatorsCache.Validator(proposerIndex) + tValidator, exists := s.TrackedValidatorsCache.Validator(proposer) if exists { feeRecpt = tValidator.FeeRecipient[:] } @@ -628,34 +626,30 @@ func (s *Server) computePayloadAttributes(ctx context.Context, ev payloadattribu if v == version.Bellatrix { return payloadattribute.New(&engine.PayloadAttributes{ Timestamp: timestamp, - PrevRandao: prevRando, + PrevRandao: randao, SuggestedFeeRecipient: feeRecpt, }) } - w, _, err := ev.HeadState.ExpectedWithdrawals() + w, _, err := st.ExpectedWithdrawals() if err != nil { return nil, errors.Wrap(err, "could not get withdrawals from head state") } if v == version.Capella { return payloadattribute.New(&engine.PayloadAttributesV2{ Timestamp: timestamp, - PrevRandao: prevRando, + PrevRandao: randao, SuggestedFeeRecipient: feeRecpt, Withdrawals: w, }) } - pr, err := ev.HeadBlock.Block().HashTreeRoot() - if err != nil { - return nil, errors.Wrap(err, "could not compute head block root") - } return payloadattribute.New(&engine.PayloadAttributesV3{ Timestamp: timestamp, - PrevRandao: prevRando, + PrevRandao: randao, SuggestedFeeRecipient: feeRecpt, Withdrawals: w, - ParentBeaconBlockRoot: pr[:], + ParentBeaconBlockRoot: root[:], }) } @@ -665,37 +659,75 @@ type asyncPayloadAttrData struct { err error } +var zeroRoot [32]byte + +// needsFill allows tests to provide filled EventData values. An ordinary event data value fired by the blockchain package will have +// all of the checked fields empty, so the logical short circuit should hit immediately. +func needsFill(ev payloadattribute.EventData) bool { + return ev.HeadState == nil || ev.HeadState.IsNil() || ev.HeadState.LatestBlockHeader() == nil || + ev.HeadBlock == nil || ev.HeadBlock.IsNil() || + ev.HeadRoot == zeroRoot || len(ev.ParentBlockRoot) == 0 || len(ev.ParentBlockHash) == 0 || + ev.Attributer == nil || ev.Attributer.IsEmpty() +} + func (s *Server) fillEventData(ctx context.Context, ev payloadattribute.EventData) (payloadattribute.EventData, error) { - if ev.HeadBlock == nil || ev.HeadBlock.IsNil() { - hb, err := s.HeadFetcher.HeadBlock(ctx) - if err != nil { - return ev, errors.Wrap(err, "Could not look up head block") - } - root, err := hb.Block().HashTreeRoot() - if err != nil { - return ev, errors.Wrap(err, "Could not compute head block root") - } - if ev.HeadRoot != root { - return ev, errors.Wrap(err, "head root changed before payload attribute event handler execution") - } - ev.HeadBlock = hb - payload, err := hb.Block().Body().Execution() - if err != nil { - return ev, errors.Wrap(err, "Could not get execution payload for head block") - } - ev.ParentBlockHash = payload.BlockHash() - ev.ParentBlockNumber = payload.BlockNumber() + var err error + + if !needsFill(ev) { + return ev, nil } - attr := ev.Attributer - if attr == nil || attr.IsEmpty() { - attr, err := s.computePayloadAttributes(ctx, ev) + ev.HeadState, err = s.HeadFetcher.HeadState(ctx) + if err != nil { + return ev, errors.Wrap(err, "could not get head state") + } + + ev.HeadBlock, err = s.HeadFetcher.HeadBlock(ctx) + if err != nil { + return ev, errors.Wrap(err, "could not look up head block") + } + ev.HeadRoot, err = ev.HeadBlock.Block().HashTreeRoot() + if err != nil { + return ev, errors.Wrap(err, "could not compute head block root") + } + pr := ev.HeadBlock.Block().ParentRoot() + ev.ParentBlockRoot = pr[:] + + hsr, err := ev.HeadState.LatestBlockHeader().HashTreeRoot() + if err != nil { + return ev, errors.Wrap(err, "could not compute latest block header root") + } + + pse := slots.ToEpoch(ev.ProposalSlot) + st := ev.HeadState + if slots.ToEpoch(st.Slot()) != pse { + st, err = transition.ProcessSlotsUsingNextSlotCache(ctx, st, hsr[:], ev.ProposalSlot) if err != nil { - return ev, errors.Wrap(err, "Could not compute payload attributes") + return ev, errors.Wrap(err, "could not run process blocks on head state into the proposal slot epoch") } - ev.Attributer = attr } - return ev, nil + ev.ProposerIndex, err = helpers.BeaconProposerIndexAtSlot(ctx, st, ev.ProposalSlot) + if err != nil { + return ev, errors.Wrap(err, "failed to compute proposer index") + } + randao, err := helpers.RandaoMix(st, pse) + if err != nil { + return ev, errors.Wrap(err, "could not get head state randado") + } + + payload, err := ev.HeadBlock.Block().Body().Execution() + if err != nil { + return ev, errors.Wrap(err, "could not get execution payload for head block") + } + ev.ParentBlockHash = payload.BlockHash() + ev.ParentBlockNumber = payload.BlockNumber() + + t, err := slots.ToTime(st.GenesisTime(), ev.ProposalSlot) + if err != nil { + return ev, errors.Wrap(err, "could not get head state slot time") + } + ev.Attributer, err = s.computePayloadAttributes(ctx, st, hsr, ev.ProposerIndex, uint64(t.Unix()), randao) + return ev, err } // This event stream is intended to be used by builders and relays. @@ -704,10 +736,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut ctx, cancel := context.WithTimeout(ctx, payloadAttributeTimeout) edc := make(chan asyncPayloadAttrData) go func() { - d := asyncPayloadAttrData{ - version: version.String(ev.HeadState.Version()), - } - + d := asyncPayloadAttrData{} defer func() { edc <- d }() @@ -716,6 +745,7 @@ func (s *Server) payloadAttributesReader(ctx context.Context, ev payloadattribut d.err = errors.Wrap(err, "Could not fill event data") return } + d.version = version.String(ev.HeadBlock.Version()) attributesBytes, err := marshalAttributes(ev.Attributer) if err != nil { d.err = errors.Wrap(err, "errors marshaling payload attributes to json") diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index 82c970839449..45aa49ee0b6e 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -2,6 +2,7 @@ package events import ( "context" + "encoding/binary" "fmt" "io" "math" @@ -24,6 +25,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" payloadattribute "github.com/prysmaticlabs/prysm/v5/consensus-types/payload-attribute" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/v5/proto/eth/v1" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/require" @@ -557,6 +559,110 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { }) } +func TestFillEventData(t *testing.T) { + ctx := context.Background() + t.Run("AlreadyFilledData_ShouldShortCircuitWithoutError", func(t *testing.T) { + st, err := util.NewBeaconStateBellatrix() + require.NoError(t, err) + b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockBellatrix(ð.SignedBeaconBlockBellatrix{})) + require.NoError(t, err) + attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{ + Timestamp: uint64(time.Now().Unix()), + }) + require.NoError(t, err) + alreadyFilled := payloadattribute.EventData{ + HeadState: st, + HeadBlock: b, + HeadRoot: [32]byte{1, 2, 3}, + Attributer: attributor, + ParentBlockRoot: []byte{1, 2, 3}, + ParentBlockHash: []byte{4, 5, 6}, + } + srv := &Server{} // No real HeadFetcher needed here since it won't be called. + result, err := srv.fillEventData(ctx, alreadyFilled) + require.NoError(t, err) + require.DeepEqual(t, alreadyFilled, result) + }) + t.Run("Electra PartialData_ShouldFetchHeadStateAndBlock", func(t *testing.T) { + st, err := util.NewBeaconStateElectra() + require.NoError(t, err) + valCount := 10 + setActiveValidators(t, st, valCount) + inactivityScores := make([]uint64, valCount) + for i := range inactivityScores { + inactivityScores[i] = 10 + } + require.NoError(t, st.SetInactivityScores(inactivityScores)) + b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlockElectra(ð.SignedBeaconBlockElectra{})) + require.NoError(t, err) + attributor, err := payloadattribute.New(&enginev1.PayloadAttributes{ + Timestamp: uint64(time.Now().Unix()), + }) + require.NoError(t, err) + // Create an event data object missing certain fields: + partial := payloadattribute.EventData{ + // The presence of a nil HeadState, nil HeadBlock, zeroed HeadRoot, etc. + // will cause fillEventData to try to fill the values. + ProposalSlot: 42, // different epoch from current slot + Attributer: attributor, // Must be Bellatrix or later + } + currentSlot := primitives.Slot(0) + // to avoid slot processing + require.NoError(t, st.SetSlot(currentSlot+1)) + mockChainService := &mockChain.ChainService{ + Root: make([]byte, 32), + State: st, + Block: b, + Slot: ¤tSlot, + } + + stn := mockChain.NewEventFeedWrapper() + opn := mockChain.NewEventFeedWrapper() + srv := &Server{ + StateNotifier: &mockChain.SimpleNotifier{Feed: stn}, + OperationNotifier: &mockChain.SimpleNotifier{Feed: opn}, + HeadFetcher: mockChainService, + ChainInfoFetcher: mockChainService, + TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), + EventWriteTimeout: testEventWriteTimeout, + } + + filled, err := srv.fillEventData(ctx, partial) + require.NoError(t, err, "expected successful fill of partial event data") + + // Verify that fields have been updated from the mock data: + require.NotNil(t, filled.HeadState, "HeadState should be assigned") + require.NotNil(t, filled.HeadBlock, "HeadBlock should be assigned") + require.NotEqual(t, [32]byte{}, filled.HeadRoot, "HeadRoot should no longer be zero") + require.NotEmpty(t, filled.ParentBlockRoot, "ParentBlockRoot should be filled") + require.NotEmpty(t, filled.ParentBlockHash, "ParentBlockHash should be filled") + require.Equal(t, uint64(0), filled.ParentBlockNumber, "ParentBlockNumber must match mock block") + + // Check that a valid Attributer was set: + require.NotNil(t, filled.Attributer, "Should have a valid payload attributes object") + require.Equal(t, false, filled.Attributer.IsEmpty(), "Attributer should not be empty after fill") + }) +} + +func setActiveValidators(t *testing.T, st state.BeaconState, count int) { + balances := make([]uint64, count) + validators := make([]*eth.Validator, 0, count) + for i := 0; i < count; i++ { + pubKey := make([]byte, params.BeaconConfig().BLSPubkeyLength) + binary.LittleEndian.PutUint64(pubKey, uint64(i)) + balances[i] = uint64(i) + validators = append(validators, ð.Validator{ + PublicKey: pubKey, + ActivationEpoch: 0, + ExitEpoch: params.BeaconConfig().FarFutureEpoch, + WithdrawalCredentials: make([]byte, 32), + }) + } + + require.NoError(t, st.SetValidators(validators)) + require.NoError(t, st.SetBalances(balances)) +} + func TestStuckReaderScenarios(t *testing.T) { cases := []struct { name string diff --git a/changelog/kasey_event-slot-fix.md b/changelog/kasey_event-slot-fix.md new file mode 100644 index 000000000000..ad5ed0cb6e19 --- /dev/null +++ b/changelog/kasey_event-slot-fix.md @@ -0,0 +1,2 @@ +### Fixed +- Fixed a bug in the event stream handler when processing payload attribute events where the timestamp and slot of the event would be based on the head rather than the current slot.