Skip to content

Commit 26503cf

Browse files
authored
Drop task on serialization error (#3803)
1 parent bd06826 commit 26503cf

File tree

6 files changed

+116
-43
lines changed

6 files changed

+116
-43
lines changed

common/metrics/metric_defs.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1433,11 +1433,12 @@ var (
14331433
TaskNotActiveCounter = NewCounterDef("task_errors_not_active_counter")
14341434
TaskLimitExceededCounter = NewCounterDef("task_errors_limit_exceeded_counter")
14351435
TaskNamespaceHandoverCounter = NewCounterDef("task_errors_namespace_handover")
1436+
TaskThrottledCounter = NewCounterDef("task_errors_throttled")
1437+
TaskCorruptionCounter = NewCounterDef("task_errors_corruption")
14361438
TaskScheduleToStartLatency = NewTimerDef("task_schedule_to_start_latency")
14371439
TransferTaskMissingEventCounter = NewCounterDef("transfer_task_missing_event_counter")
14381440
TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter")
14391441
TaskReschedulerPendingTasks = NewDimensionlessHistogramDef("task_rescheduler_pending_tasks")
1440-
TaskThrottledCounter = NewCounterDef("task_throttled_counter")
14411442
PendingTasksCounter = NewDimensionlessHistogramDef("pending_tasks")
14421443
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
14431444
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")

common/persistence/data_blob.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -25,22 +25,21 @@
2525
package persistence
2626

2727
import (
28-
"fmt"
29-
3028
commonpb "go.temporal.io/api/common/v1"
3129
enumspb "go.temporal.io/api/enums/v1"
3230
)
3331

3432
// NewDataBlob returns a new DataBlob
33+
// TODO: return an UnknowEncodingType error with the actual type string when encodingTypeStr is invalid
3534
func NewDataBlob(data []byte, encodingTypeStr string) *commonpb.DataBlob {
3635
if len(data) == 0 {
3736
return nil
3837
}
3938

4039
encodingType, ok := enumspb.EncodingType_value[encodingTypeStr]
41-
if !ok || (enumspb.EncodingType(encodingType) != enumspb.ENCODING_TYPE_PROTO3 &&
42-
enumspb.EncodingType(encodingType) != enumspb.ENCODING_TYPE_JSON) {
43-
panic(fmt.Sprintf("Invalid encoding: %v", encodingTypeStr))
40+
if !ok {
41+
// encodingTypeStr not valid, an error will be returned on deserialization
42+
encodingType = int32(enumspb.ENCODING_TYPE_UNSPECIFIED)
4443
}
4544

4645
return &commonpb.DataBlob{

common/persistence/serialization/blob.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
package serialization
2626

2727
import (
28-
"fmt"
29-
3028
"github.com/gogo/protobuf/proto"
3129
commonpb "go.temporal.io/api/common/v1"
3230
enumspb "go.temporal.io/api/enums/v1"
@@ -137,7 +135,7 @@ func encode(
137135
case enumspb.ENCODING_TYPE_PROTO3:
138136
return proto3Encode(object)
139137
default:
140-
return commonpb.DataBlob{}, fmt.Errorf("unknown encoding type: %v", encoding)
138+
return commonpb.DataBlob{}, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
141139
}
142140
}
143141

@@ -156,27 +154,27 @@ func decode(
156154
case enumspb.ENCODING_TYPE_PROTO3:
157155
return proto3Decode(blob, encoding, result)
158156
default:
159-
return fmt.Errorf("unknown encoding type: %v", encoding)
157+
return NewUnknownEncodingTypeError(encoding, enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
160158
}
161159
}
162160

163161
func proto3Encode(m proto.Message) (commonpb.DataBlob, error) {
164162
blob := commonpb.DataBlob{EncodingType: enumspb.ENCODING_TYPE_PROTO3}
165163
data, err := proto.Marshal(m)
166164
if err != nil {
167-
return blob, fmt.Errorf("error serializing struct to blob using %v encoding: %w", enumspb.ENCODING_TYPE_PROTO3, err)
165+
return blob, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err)
168166
}
169167
blob.Data = data
170168
return blob, nil
171169
}
172170

173171
func proto3Decode(blob []byte, encoding string, result proto.Message) error {
174172
if e, ok := enumspb.EncodingType_value[encoding]; !ok || enumspb.EncodingType(e) != enumspb.ENCODING_TYPE_PROTO3 {
175-
return fmt.Errorf("encoding %s doesn't match expected encoding %v", encoding, enumspb.ENCODING_TYPE_PROTO3)
173+
return NewUnknownEncodingTypeError(encoding, enumspb.ENCODING_TYPE_PROTO3)
176174
}
177175

178176
if err := proto.Unmarshal(blob, result); err != nil {
179-
return fmt.Errorf("error deserializing blob using %v encoding: %w", enumspb.ENCODING_TYPE_PROTO3, err)
177+
return NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, err)
180178
}
181179
return nil
182180
}

common/persistence/serialization/serializer.go

+68-25
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
package serialization
2626

2727
import (
28+
"errors"
2829
"fmt"
2930
"reflect"
31+
"strings"
3032

3133
"github.com/gogo/protobuf/proto"
3234
commonpb "go.temporal.io/api/common/v1"
@@ -107,17 +109,20 @@ type (
107109

108110
// SerializationError is an error type for serialization
109111
SerializationError struct {
110-
msg string
112+
encodingType enumspb.EncodingType
113+
wrappedErr error
111114
}
112115

113116
// DeserializationError is an error type for deserialization
114117
DeserializationError struct {
115-
msg string
118+
encodingType enumspb.EncodingType
119+
wrappedErr error
116120
}
117121

118122
// UnknownEncodingTypeError is an error type for unknown or unsupported encoding type
119123
UnknownEncodingTypeError struct {
120-
encodingType enumspb.EncodingType
124+
encodingTypeStr string
125+
expectedEncodingStr []string
121126
}
122127

123128
serializerImpl struct {
@@ -149,7 +154,7 @@ func (t *serializerImpl) DeserializeEvents(data *commonpb.DataBlob) ([]*historyp
149154
// Client API currently specifies encodingType on requests which span multiple of these objects
150155
err = events.Unmarshal(data.Data)
151156
default:
152-
return nil, NewDeserializationError("DeserializeEvents invalid encoding")
157+
return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
153158
}
154159
if err != nil {
155160
return nil, err
@@ -179,7 +184,7 @@ func (t *serializerImpl) DeserializeEvent(data *commonpb.DataBlob) (*historypb.H
179184
// Client API currently specifies encodingType on requests which span multiple of these objects
180185
err = event.Unmarshal(data.Data)
181186
default:
182-
return nil, NewDeserializationError("DeserializeEvent invalid encoding")
187+
return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
183188
}
184189

185190
if err != nil {
@@ -212,7 +217,7 @@ func (t *serializerImpl) DeserializeClusterMetadata(data *commonpb.DataBlob) (*p
212217
// Client API currently specifies encodingType on requests which span multiple of these objects
213218
err = cm.Unmarshal(data.Data)
214219
default:
215-
return nil, NewDeserializationError("DeserializeClusterMetadata invalid encoding")
220+
return nil, NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
216221
}
217222

218223
if err != nil {
@@ -235,11 +240,11 @@ func (t *serializerImpl) serialize(p proto.Marshaler, encodingType enumspb.Encod
235240
// Client API currently specifies encodingType on requests which span multiple of these objects
236241
data, err = p.Marshal()
237242
default:
238-
return nil, NewUnknownEncodingTypeError(encodingType)
243+
return nil, NewUnknownEncodingTypeError(encodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
239244
}
240245

241246
if err != nil {
242-
return nil, NewSerializationError(err.Error())
247+
return nil, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err)
243248
}
244249

245250
// Shouldn't happen, but keeping
@@ -254,30 +259,68 @@ func (t *serializerImpl) serialize(p proto.Marshaler, encodingType enumspb.Encod
254259
}
255260

256261
// NewUnknownEncodingTypeError returns a new instance of encoding type error
257-
func NewUnknownEncodingTypeError(encodingType enumspb.EncodingType) error {
258-
return &UnknownEncodingTypeError{encodingType: encodingType}
262+
func NewUnknownEncodingTypeError(
263+
encodingTypeStr string,
264+
expectedEncoding ...enumspb.EncodingType,
265+
) error {
266+
if len(expectedEncoding) == 0 {
267+
for encodingType := range enumspb.EncodingType_name {
268+
expectedEncoding = append(expectedEncoding, enumspb.EncodingType(encodingType))
269+
}
270+
}
271+
expectedEncodingStr := make([]string, 0, len(expectedEncoding))
272+
for _, encodingType := range expectedEncoding {
273+
expectedEncodingStr = append(expectedEncodingStr, encodingType.String())
274+
}
275+
return &UnknownEncodingTypeError{
276+
encodingTypeStr: encodingTypeStr,
277+
expectedEncodingStr: expectedEncodingStr,
278+
}
259279
}
260280

261281
func (e *UnknownEncodingTypeError) Error() string {
262-
return fmt.Sprintf("unknown or unsupported encoding type %v", e.encodingType)
282+
return fmt.Sprintf("unknown or unsupported encoding type %v, supported types: %v",
283+
e.encodingTypeStr,
284+
strings.Join(e.expectedEncodingStr, ","),
285+
)
263286
}
264287

265288
// NewSerializationError returns a SerializationError
266-
func NewSerializationError(msg string) error {
267-
return &SerializationError{msg: msg}
289+
func NewSerializationError(
290+
encodingType enumspb.EncodingType,
291+
serializationErr error,
292+
) error {
293+
return &SerializationError{
294+
encodingType: encodingType,
295+
wrappedErr: serializationErr,
296+
}
268297
}
269298

270299
func (e *SerializationError) Error() string {
271-
return fmt.Sprintf("serialization error: %v", e.msg)
300+
return fmt.Sprintf("error serializing using %v encoding: %v", e.encodingType, e.wrappedErr)
301+
}
302+
303+
func (e *SerializationError) Unwrap() error {
304+
return e.wrappedErr
272305
}
273306

274307
// NewDeserializationError returns a DeserializationError
275-
func NewDeserializationError(msg string) error {
276-
return &DeserializationError{msg: msg}
308+
func NewDeserializationError(
309+
encodingType enumspb.EncodingType,
310+
deserializationErr error,
311+
) error {
312+
return &DeserializationError{
313+
encodingType: encodingType,
314+
wrappedErr: deserializationErr,
315+
}
277316
}
278317

279318
func (e *DeserializationError) Error() string {
280-
return fmt.Sprintf("deserialization error: %v", e.msg)
319+
return fmt.Sprintf("error deserializing using %v encoding: %v", e.encodingType, e.wrappedErr)
320+
}
321+
322+
func (e *DeserializationError) Unwrap() error {
323+
return e.wrappedErr
281324
}
282325

283326
func (t *serializerImpl) ShardInfoToBlob(info *persistencespb.ShardInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
@@ -469,23 +512,23 @@ func (t *serializerImpl) ReplicationTaskFromBlob(data *commonpb.DataBlob) (*repl
469512
func ProtoDecodeBlob(data *commonpb.DataBlob, result proto.Message) error {
470513
if data == nil {
471514
// TODO: should we return nil or error?
472-
return NewDeserializationError("cannot decode nil")
515+
return NewDeserializationError(enumspb.ENCODING_TYPE_UNSPECIFIED, errors.New("cannot decode nil"))
473516
}
474517

475518
if data.EncodingType != enumspb.ENCODING_TYPE_PROTO3 {
476-
return NewDeserializationError(fmt.Sprintf("encoding %v doesn't match expected encoding %v", data.EncodingType, enumspb.ENCODING_TYPE_PROTO3))
519+
return NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_PROTO3)
477520
}
478521

479522
if err := proto.Unmarshal(data.Data, result); err != nil {
480-
return NewDeserializationError(fmt.Sprintf("error deserializing blob using %v encoding: %s", enumspb.ENCODING_TYPE_PROTO3, err))
523+
return NewDeserializationError(enumspb.ENCODING_TYPE_PROTO3, err)
481524
}
482525
return nil
483526
}
484527

485528
func decodeBlob(data *commonpb.DataBlob, result proto.Message) error {
486529
if data == nil {
487530
// TODO: should we return nil or error?
488-
return NewDeserializationError("cannot decode nil")
531+
return NewDeserializationError(enumspb.ENCODING_TYPE_UNSPECIFIED, errors.New("cannot decode nil"))
489532
}
490533

491534
if data.Data == nil {
@@ -498,7 +541,7 @@ func decodeBlob(data *commonpb.DataBlob, result proto.Message) error {
498541
case enumspb.ENCODING_TYPE_PROTO3:
499542
return ProtoDecodeBlob(data, result)
500543
default:
501-
return NewUnknownEncodingTypeError(data.EncodingType)
544+
return NewUnknownEncodingTypeError(data.EncodingType.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
502545
}
503546
}
504547

@@ -523,13 +566,13 @@ func encodeBlob(o proto.Message, encoding enumspb.EncodingType) (*commonpb.DataB
523566
case enumspb.ENCODING_TYPE_PROTO3:
524567
return ProtoEncodeBlob(o, enumspb.ENCODING_TYPE_PROTO3)
525568
default:
526-
return nil, NewUnknownEncodingTypeError(encoding)
569+
return nil, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_JSON, enumspb.ENCODING_TYPE_PROTO3)
527570
}
528571
}
529572

530573
func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.DataBlob, error) {
531574
if encoding != enumspb.ENCODING_TYPE_PROTO3 {
532-
return nil, NewUnknownEncodingTypeError(encoding)
575+
return nil, NewUnknownEncodingTypeError(encoding.String(), enumspb.ENCODING_TYPE_PROTO3)
533576
}
534577

535578
if m == nil || (reflect.ValueOf(m).Kind() == reflect.Ptr && reflect.ValueOf(m).IsNil()) {
@@ -543,7 +586,7 @@ func ProtoEncodeBlob(m proto.Message, encoding enumspb.EncodingType) (*commonpb.
543586
blob := &commonpb.DataBlob{EncodingType: enumspb.ENCODING_TYPE_PROTO3}
544587
data, err := proto.Marshal(m)
545588
if err != nil {
546-
return nil, NewSerializationError(err.Error())
589+
return nil, NewSerializationError(enumspb.ENCODING_TYPE_PROTO3, err)
547590
}
548591
blob.Data = data
549592
return blob, nil

service/history/queues/executable.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ package queues
2828

2929
import (
3030
"context"
31+
"errors"
32+
"fmt"
33+
"runtime/debug"
3134
"sync"
3235
"time"
3336

@@ -42,6 +45,7 @@ import (
4245
"go.temporal.io/server/common/log/tag"
4346
"go.temporal.io/server/common/metrics"
4447
"go.temporal.io/server/common/namespace"
48+
"go.temporal.io/server/common/persistence/serialization"
4549
ctasks "go.temporal.io/server/common/tasks"
4650
"go.temporal.io/server/common/util"
4751
"go.temporal.io/server/service/history/consts"
@@ -178,19 +182,22 @@ func (e *executableImpl) Execute() (retErr error) {
178182
headers.NewBackgroundCallerInfo(ns.String()),
179183
)
180184

181-
var panicErr error
182185
defer func() {
183-
if panicErr != nil {
184-
retErr = panicErr
186+
if panicObj := recover(); panicObj != nil {
187+
err, ok := panicObj.(error)
188+
if !ok {
189+
err = serviceerror.NewInternal(fmt.Sprintf("panic: %v", panicObj))
190+
}
191+
192+
e.logger.Error("Panic is captured", tag.SysStackTrace(string(debug.Stack())), tag.Error(err))
193+
retErr = err
185194

186195
// we need to guess the metrics tags here as we don't know which execution logic
187196
// is actually used which is upto the executor implementation
188197
e.taggedMetricsHandler = e.metricsHandler.WithTags(e.estimateTaskMetricTag()...)
189198
}
190199
}()
191200

192-
defer log.CapturePanic(e.logger, &panicErr)
193-
194201
startTime := e.timeSource.Now()
195202

196203
metricsTags, isActive, err := e.executor.Execute(ctx, e)
@@ -291,6 +298,16 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
291298
return err
292299
}
293300

301+
var deserializationError *serialization.DeserializationError
302+
var encodingTypeError *serialization.UnknownEncodingTypeError
303+
if errors.As(err, &deserializationError) || errors.As(err, &encodingTypeError) {
304+
// likely due to data corruption, emit logs, metrics & drop the task by return nil so that
305+
// task will be marked as completed.
306+
e.taggedMetricsHandler.Counter(metrics.TaskCorruptionCounter.GetMetricName()).Record(1)
307+
e.logger.Error("Drop task due to serialization error", tag.Error(err))
308+
return nil
309+
}
310+
294311
e.taggedMetricsHandler.Counter(metrics.TaskFailures.GetMetricName()).Record(1)
295312

296313
e.logger.Error("Fail to process task", tag.Error(err), tag.LifeCycleProcessingFailed)

0 commit comments

Comments
 (0)