Skip to content

Commit d7efccf

Browse files
james-prysmrkapka
andauthored
single attestation cleanup (#14984)
* some cleanup and minor bug fix * adding some comments back in * Update beacon-chain/sync/pending_attestations_queue.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/pending_attestations_queue.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/validate_beacon_attestation.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update beacon-chain/sync/pending_attestations_queue.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * adding comment back in * linting * fixing committeeIndiciesSLot * fixing changelog --------- Co-authored-by: Radosław Kapka <rkapka@wp.pl>
1 parent 334920b commit d7efccf

3 files changed

+87
-70
lines changed

beacon-chain/sync/pending_attestations_queue.go

+38-35
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,7 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
140140
data := att.GetData()
141141

142142
// This is an important validation before retrieving attestation pre state to defend against
143-
// attestation's target intentionally reference checkpoint that's long ago.
144-
// Verify current finalized checkpoint is an ancestor of the block defined by the attestation's beacon block root.
143+
// attestation's target intentionally referencing a checkpoint that's long ago.
145144
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
146145
log.WithError(blockchain.ErrNotDescendantOfFinalized).Debug("Could not verify finalized consistency")
147146
return
@@ -169,70 +168,74 @@ func (s *Service) processUnaggregated(ctx context.Context, att ethpb.Att) {
169168
return
170169
}
171170

172-
var singleAtt *ethpb.SingleAttestation
171+
// Decide if the attestation is an Electra SingleAttestation or a Phase0 unaggregated attestation
172+
var (
173+
attForValidation ethpb.Att
174+
broadcastAtt ethpb.Att
175+
eventType feed.EventType
176+
eventData interface{}
177+
)
178+
173179
if att.Version() >= version.Electra {
174-
var ok bool
175-
singleAtt, ok = att.(*ethpb.SingleAttestation)
180+
singleAtt, ok := att.(*ethpb.SingleAttestation)
176181
if !ok {
177182
log.Debugf("Attestation has wrong type (expected %T, got %T)", &ethpb.SingleAttestation{}, att)
178183
return
179184
}
180-
att = singleAtt.ToAttestationElectra(committee)
185+
// Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions.
186+
attForValidation = singleAtt.ToAttestationElectra(committee)
187+
broadcastAtt = singleAtt
188+
eventType = operation.SingleAttReceived
189+
eventData = &operation.SingleAttReceivedData{
190+
Attestation: singleAtt,
191+
}
192+
} else {
193+
// Phase0 attestation
194+
attForValidation = att
195+
broadcastAtt = att
196+
eventType = operation.UnaggregatedAttReceived
197+
eventData = &operation.UnAggregatedAttReceivedData{
198+
Attestation: att,
199+
}
181200
}
182201

183-
valid, err = s.validateUnaggregatedAttWithState(ctx, att, preState)
202+
valid, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState)
184203
if err != nil {
185204
log.WithError(err).Debug("Pending unaggregated attestation failed validation")
186205
return
187206
}
207+
188208
if valid == pubsub.ValidationAccept {
189209
if features.Get().EnableExperimentalAttestationPool {
190-
if err = s.cfg.attestationCache.Add(att); err != nil {
210+
if err = s.cfg.attestationCache.Add(attForValidation); err != nil {
191211
log.WithError(err).Debug("Could not save unaggregated attestation")
192212
return
193213
}
194214
} else {
195-
if err := s.cfg.attPool.SaveUnaggregatedAttestation(att); err != nil {
215+
if err := s.cfg.attPool.SaveUnaggregatedAttestation(attForValidation); err != nil {
196216
log.WithError(err).Debug("Could not save unaggregated attestation")
197217
return
198218
}
199219
}
200-
s.setSeenCommitteeIndicesSlot(data.Slot, att.GetCommitteeIndex(), att.GetAggregationBits())
220+
221+
s.setSeenCommitteeIndicesSlot(data.Slot, attForValidation.GetCommitteeIndex(), attForValidation.GetAggregationBits())
201222

202223
valCount, err := helpers.ActiveValidatorCount(ctx, preState, slots.ToEpoch(data.Slot))
203224
if err != nil {
204225
log.WithError(err).Debug("Could not retrieve active validator count")
205226
return
206227
}
207228

208-
// Broadcasting the signed attestation again once a node is able to process it.
209-
var attToBroadcast ethpb.Att
210-
if singleAtt != nil {
211-
attToBroadcast = singleAtt
212-
} else {
213-
attToBroadcast = att
214-
}
215-
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, attToBroadcast), attToBroadcast); err != nil {
229+
// Broadcast the final 'broadcastAtt' object
230+
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, broadcastAtt), broadcastAtt); err != nil {
216231
log.WithError(err).Debug("Could not broadcast")
217232
}
218233

219-
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
220-
// of a received unaggregated attestation.
221-
if singleAtt != nil {
222-
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
223-
Type: operation.SingleAttReceived,
224-
Data: &operation.SingleAttReceivedData{
225-
Attestation: singleAtt,
226-
},
227-
})
228-
} else {
229-
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
230-
Type: operation.UnaggregatedAttReceived,
231-
Data: &operation.UnAggregatedAttReceivedData{
232-
Attestation: att,
233-
},
234-
})
235-
}
234+
// Feed event notification for other services
235+
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
236+
Type: eventType,
237+
Data: eventData,
238+
})
236239
}
237240
}
238241

beacon-chain/sync/validate_beacon_attestation.go

+45-35
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ import (
3434
// - The attestation is unaggregated -- that is, it has exactly one participating validator (len(get_attesting_indices(state, attestation.data, attestation.aggregation_bits)) == 1).
3535
// - attestation.data.slot is within the last ATTESTATION_PROPAGATION_SLOT_RANGE slots (attestation.data.slot + ATTESTATION_PROPAGATION_SLOT_RANGE >= current_slot >= attestation.data.slot).
3636
// - The signature of attestation is valid.
37-
func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
37+
func (s *Service) validateCommitteeIndexBeaconAttestation(
38+
ctx context.Context,
39+
pid peer.ID,
40+
msg *pubsub.Message,
41+
) (pubsub.ValidationResult, error) {
3842
if pid == s.cfg.p2p.PeerID() {
3943
return pubsub.ValidationAccept, nil
4044
}
@@ -64,6 +68,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
6468
if err := helpers.ValidateNilAttestation(att); err != nil {
6569
return pubsub.ValidationReject, err
6670
}
71+
6772
data := att.GetData()
6873

6974
// Do not process slot 0 attestations.
@@ -73,8 +78,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
7378

7479
// Attestation's slot is within ATTESTATION_PROPAGATION_SLOT_RANGE and early attestation
7580
// processing tolerance.
76-
if err := helpers.ValidateAttestationTime(data.Slot, s.cfg.clock.GenesisTime(),
77-
earlyAttestationProcessingTolerance); err != nil {
81+
if err := helpers.ValidateAttestationTime(data.Slot, s.cfg.clock.GenesisTime(), earlyAttestationProcessingTolerance); err != nil {
7882
tracing.AnnotateError(span, err)
7983
return pubsub.ValidationIgnore, err
8084
}
@@ -89,7 +93,6 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
8993
if s.hasSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits()) {
9094
return pubsub.ValidationIgnore, nil
9195
}
92-
9396
// Reject an attestation if it references an invalid block.
9497
if s.hasBadBlock(bytesutil.ToBytes32(data.BeaconBlockRoot)) ||
9598
s.hasBadBlock(bytesutil.ToBytes32(data.Target.Root)) ||
@@ -99,15 +102,12 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
99102
}
100103
}
101104

102-
var validationRes pubsub.ValidationResult
103-
104105
// Verify the block being voted and the processed state is in beaconDB and the block has passed validation if it's in the beaconDB.
105106
blockRoot := bytesutil.ToBytes32(data.BeaconBlockRoot)
106107
if !s.hasBlockAndState(ctx, blockRoot) {
107108
return s.saveToPendingAttPool(att)
108109
}
109-
110-
if !s.cfg.chain.InForkchoice(bytesutil.ToBytes32(data.BeaconBlockRoot)) {
110+
if !s.cfg.chain.InForkchoice(blockRoot) {
111111
tracing.AnnotateError(span, blockchain.ErrNotDescendantOfFinalized)
112112
return pubsub.ValidationIgnore, blockchain.ErrNotDescendantOfFinalized
113113
}
@@ -123,12 +123,12 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
123123
return pubsub.ValidationIgnore, err
124124
}
125125

126-
validationRes, err = s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic)
126+
validationRes, err := s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic)
127127
if validationRes != pubsub.ValidationAccept {
128128
return validationRes, err
129129
}
130130

131-
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, att.GetData().Slot, committeeIndex)
131+
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, committeeIndex)
132132
if err != nil {
133133
tracing.AnnotateError(span, err)
134134
return pubsub.ValidationIgnore, err
@@ -139,16 +139,37 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
139139
return validationRes, err
140140
}
141141

142-
var singleAtt *eth.SingleAttestation
142+
// Consolidated handling of Electra SingleAttestation vs Phase0 unaggregated attestation
143+
var (
144+
attForValidation eth.Att // what we'll pass to further validation
145+
eventType feed.EventType
146+
eventData interface{}
147+
)
148+
143149
if att.Version() >= version.Electra {
144-
singleAtt, ok = att.(*eth.SingleAttestation)
150+
singleAtt, ok := att.(*eth.SingleAttestation)
145151
if !ok {
146-
return pubsub.ValidationIgnore, fmt.Errorf("attestation has wrong type (expected %T, got %T)", &eth.SingleAttestation{}, att)
152+
return pubsub.ValidationIgnore, fmt.Errorf(
153+
"attestation has wrong type (expected %T, got %T)",
154+
&eth.SingleAttestation{}, att,
155+
)
156+
}
157+
// Convert Electra SingleAttestation to unaggregated ElectraAttestation. This is needed because many parts of the codebase assume that attestations have a certain structure and SingleAttestation validates these assumptions.
158+
attForValidation = singleAtt.ToAttestationElectra(committee)
159+
eventType = operation.SingleAttReceived
160+
eventData = &operation.SingleAttReceivedData{
161+
Attestation: singleAtt,
162+
}
163+
} else {
164+
// Phase0 unaggregated attestation
165+
attForValidation = att
166+
eventType = operation.UnaggregatedAttReceived
167+
eventData = &operation.UnAggregatedAttReceivedData{
168+
Attestation: att,
147169
}
148-
att = singleAtt.ToAttestationElectra(committee)
149170
}
150171

151-
validationRes, err = s.validateUnaggregatedAttWithState(ctx, att, preState)
172+
validationRes, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState)
152173
if validationRes != pubsub.ValidationAccept {
153174
return validationRes, err
154175
}
@@ -172,7 +193,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
172193
tracing.AnnotateError(span, err)
173194
return
174195
}
175-
indexedAtt, err := attestation.ConvertToIndexed(ctx, att, committee)
196+
indexedAtt, err := attestation.ConvertToIndexed(ctx, attForValidation, committee)
176197
if err != nil {
177198
log.WithError(err).Error("Could not convert to indexed attestation")
178199
tracing.AnnotateError(span, err)
@@ -182,27 +203,16 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
182203
}()
183204
}
184205

185-
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
186-
// of a received unaggregated attestation.
187-
if singleAtt != nil {
188-
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
189-
Type: operation.SingleAttReceived,
190-
Data: &operation.SingleAttReceivedData{
191-
Attestation: singleAtt,
192-
},
193-
})
194-
} else {
195-
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
196-
Type: operation.UnaggregatedAttReceived,
197-
Data: &operation.UnAggregatedAttReceivedData{
198-
Attestation: att,
199-
},
200-
})
201-
}
206+
// Notify other services in the beacon node
207+
s.cfg.attestationNotifier.OperationFeed().Send(&feed.Event{
208+
Type: eventType,
209+
Data: eventData,
210+
})
202211

203-
s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, att.GetAggregationBits())
212+
s.setSeenCommitteeIndicesSlot(data.Slot, committeeIndex, attForValidation.GetAggregationBits())
204213

205-
msg.ValidatorData = att
214+
// Attach final validated attestation to the message for further pipeline use
215+
msg.ValidatorData = attForValidation
206216

207217
return pubsub.ValidationAccept, nil
208218
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
### Ignored
2+
3+
- Cleanup single attestation code for readability.
4+

0 commit comments

Comments
 (0)