@@ -12,6 +12,8 @@ import (
12
12
13
13
"github.com/ethereum/go-ethereum/common/hexutil"
14
14
"github.com/pkg/errors"
15
+ "github.com/prometheus/client_golang/prometheus"
16
+ "github.com/prometheus/client_golang/prometheus/promauto"
15
17
"github.com/prysmaticlabs/prysm/v5/api"
16
18
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
17
19
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
76
78
errWriterUnusable = errors .New ("http response writer is unusable" )
77
79
)
78
80
81
+ var httpSSEErrorCount = promauto .NewCounterVec (
82
+ prometheus.CounterOpts {
83
+ Name : "http_sse_error_count" ,
84
+ Help : "Total HTTP errors for server sent events endpoint" ,
85
+ },
86
+ []string {"endpoint" , "error" },
87
+ )
88
+
79
89
// The eventStreamer uses lazyReaders to defer serialization until the moment the value is ready to be written to the client.
80
90
type lazyReader func () io.Reader
81
91
@@ -145,6 +155,13 @@ func newTopicRequest(topics []string) (*topicRequest, error) {
145
155
// Servers may send SSE comments beginning with ':' for any purpose,
146
156
// including to keep the event stream connection alive in the presence of proxy servers.
147
157
func (s * Server ) StreamEvents (w http.ResponseWriter , r * http.Request ) {
158
+ var err error
159
+ defer func () {
160
+ if err != nil {
161
+ httpSSEErrorCount .WithLabelValues (r .URL .Path , err .Error ()).Inc ()
162
+ }
163
+ }()
164
+
148
165
log .Debug ("Starting StreamEvents handler" )
149
166
ctx , span := trace .StartSpan (r .Context (), "events.StreamEvents" )
150
167
defer span .End ()
@@ -174,7 +191,7 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
174
191
defer cancel ()
175
192
es := newEventStreamer (buffSize , ka )
176
193
177
- go es .outboxWriteLoop (ctx , cancel , sw )
194
+ go es .outboxWriteLoop (ctx , cancel , sw , r . URL . Path )
178
195
if err := es .recvEventLoop (ctx , cancel , topics , s ); err != nil {
179
196
log .WithError (err ).Debug ("Shutting down StreamEvents handler." )
180
197
}
@@ -264,11 +281,12 @@ func newlineReader() io.Reader {
264
281
265
282
// outboxWriteLoop runs in a separate goroutine. Its job is to write the values in the outbox to
266
283
// the client as fast as the client can read them.
267
- func (es * eventStreamer ) outboxWriteLoop (ctx context.Context , cancel context.CancelFunc , w * streamingResponseWriterController ) {
284
+ func (es * eventStreamer ) outboxWriteLoop (ctx context.Context , cancel context.CancelFunc , w * streamingResponseWriterController , endpoint string ) {
268
285
var err error
269
286
defer func () {
270
287
if err != nil {
271
288
log .WithError (err ).Debug ("Event streamer shutting down due to error." )
289
+ httpSSEErrorCount .WithLabelValues (endpoint , err .Error ()).Inc ()
272
290
}
273
291
es .exit ()
274
292
}()
0 commit comments