diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 8d2b2723335..9afd69d03ca 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -179,7 +179,7 @@ func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*mode span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace") defer span.Finish() currentTime := time.Now() - traces, err := s.multiRead(ctx, []string{traceID.String()}, currentTime.Add(-s.maxSpanAge), currentTime) + traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime) if err != nil { return nil, err } @@ -251,25 +251,34 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces") defer span.Finish() + uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery) + if err != nil { + return nil, err + } + return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax) +} + +// FindTraceIDs retrieves traces IDs that match the traceQuery +func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraceIDs") + defer span.Finish() + if err := validateQuery(traceQuery); err != nil { return nil, err } if traceQuery.NumTraces == 0 { traceQuery.NumTraces = defaultNumTraces } - uniqueTraceIDs, err := s.findTraceIDs(ctx, traceQuery) + + esTraceIDs, err := s.findTraceIDs(ctx, traceQuery) if err != nil { return nil, err } - return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax) -} -// FindTraceIDs is not implemented. -func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) { - return nil, errors.New("not implemented") // TODO: Implement + return convertTraceIDsStringsToModels(esTraceIDs) } -func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) { +func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) { childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead") childSpan.LogFields(otlog.Object("trace_ids", traceIDs)) @@ -286,16 +295,16 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) - searchAfterTime := make(map[string]uint64) - totalDocumentsFetched := make(map[string]int) - tracesMap := make(map[string]*model.Trace) + searchAfterTime := make(map[model.TraceID]uint64) + totalDocumentsFetched := make(map[model.TraceID]int) + tracesMap := make(map[model.TraceID]*model.Trace) for { if len(traceIDs) == 0 { break } for i, traceID := range traceIDs { - query := elastic.NewTermQuery("traceID", traceID) + query := elastic.NewTermQuery("traceID", traceID.String()) if val, ok := searchAfterTime[traceID]; ok { nextTime = val } @@ -333,18 +342,17 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime return nil, err } lastSpan := spans[len(spans)-1] - lastSpanTraceID := lastSpan.TraceID.String() - if traceSpan, ok := tracesMap[lastSpanTraceID]; ok { + if traceSpan, ok := tracesMap[lastSpan.TraceID]; ok { traceSpan.Spans = append(traceSpan.Spans, spans...) } else { - tracesMap[lastSpanTraceID] = &model.Trace{Spans: spans} + tracesMap[lastSpan.TraceID] = &model.Trace{Spans: spans} } - totalDocumentsFetched[lastSpanTraceID] = totalDocumentsFetched[lastSpanTraceID] + len(result.Hits.Hits) - if totalDocumentsFetched[lastSpanTraceID] < int(result.TotalHits()) { - traceIDs = append(traceIDs, lastSpanTraceID) - searchAfterTime[lastSpanTraceID] = model.TimeAsEpochMicroseconds(lastSpan.StartTime) + totalDocumentsFetched[lastSpan.TraceID] = totalDocumentsFetched[lastSpan.TraceID] + len(result.Hits.Hits) + if totalDocumentsFetched[lastSpan.TraceID] < int(result.TotalHits()) { + traceIDs = append(traceIDs, lastSpan.TraceID) + searchAfterTime[lastSpan.TraceID] = model.TimeAsEpochMicroseconds(lastSpan.StartTime) } } } @@ -355,6 +363,20 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime return traces, nil } +func convertTraceIDsStringsToModels(traceIDs []string) ([]model.TraceID, error) { + traceIDsModels := make([]model.TraceID, len(traceIDs)) + for i, ID := range traceIDs { + traceID, err := model.TraceIDFromString(ID) + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("Making traceID from string '%s' failed", ID)) + } + + traceIDsModels[i] = traceID + } + + return traceIDsModels, nil +} + func validateQuery(p *spanstore.TraceQueryParameters) error { if p == nil { return ErrMalformedRequestObject diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 68146668e16..080b560fbec 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -23,11 +23,11 @@ import ( "testing" "time" - "github.com/uber/jaeger-lib/metrics/metricstest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" + "github.com/uber/jaeger-lib/metrics/metricstest" "go.uber.org/zap" "gopkg.in/olivere/elastic.v5" @@ -145,23 +145,23 @@ func TestSpanReaderIndices(t *testing.T) { dateFormat := date.UTC().Format("2006-01-02") testCases := []struct { indices []string - params SpanReaderParams + params SpanReaderParams }{ - {params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix:"", Archive: false}, - indices: []string{spanIndex+dateFormat}}, - {params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix:"foo:", Archive: false}, - indices: []string{"foo:"+indexPrefixSeparator+spanIndex+dateFormat,"foo:"+indexPrefixSeparatorDeprecated+spanIndex+dateFormat}}, - {params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix:"", Archive: true}, - indices: []string{spanIndex+archiveIndexSuffix}}, - {params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix:"foo:", Archive: true}, - indices: []string{"foo:"+indexPrefixSeparator+spanIndex+archiveIndexSuffix}}, - {params:SpanReaderParams{Client:client, Logger: logger, MetricsFactory: metricsFactory, - IndexPrefix:"foo:", Archive: true, UseReadWriteAliases:true}, - indices: []string{"foo:"+indexPrefixSeparator+spanIndex+archiveReadIndexSuffix}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "", Archive: false}, + indices: []string{spanIndex + dateFormat}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "foo:", Archive: false}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat, "foo:" + indexPrefixSeparatorDeprecated + spanIndex + dateFormat}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "", Archive: true}, + indices: []string{spanIndex + archiveIndexSuffix}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "foo:", Archive: true}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}}, + {params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, + IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true}, + indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}}, } for _, testCase := range testCases { r := NewSpanReader(testCase.params) @@ -702,12 +702,15 @@ func TestFindTraceIDs(t *testing.T) { testGet(traceIDAggregation, t) } -func TestFindTraceIDNotImplemented(t *testing.T) { - withSpanReader(func(r *spanReaderTest) { - traceIDs, err := r.reader.FindTraceIDs(context.Background(), nil) - assert.Nil(t, traceIDs) - assert.EqualError(t, err, "not implemented") - }) +func TestTraceIDsStringsToModelsConversion(t *testing.T) { + traceIDs, err := convertTraceIDsStringsToModels([]string{"1", "2", "3"}) + assert.NoError(t, err) + assert.Equal(t, 3, len(traceIDs)) + assert.Equal(t, "1", traceIDs[0].String()) + + traceIDs, err = convertTraceIDsStringsToModels([]string{"dsfjsdklfjdsofdfsdbfkgbgoaemlrksdfbsdofgerjl"}) + assert.EqualError(t, err, "Making traceID from string 'dsfjsdklfjdsofdfsdbfkgbgoaemlrksdfbsdofgerjl' failed: TraceID cannot be longer than 32 hex characters: dsfjsdklfjdsofdfsdbfkgbgoaemlrksdfbsdofgerjl") + assert.Equal(t, 0, len(traceIDs)) } func mockMultiSearchService(r *spanReaderTest) *mock.Call {