@@ -16,6 +16,7 @@ package main
16
16
17
17
import (
18
18
"fmt"
19
+ "io"
19
20
"log"
20
21
"os"
21
22
"os/signal"
@@ -28,10 +29,10 @@ import (
28
29
"github.com/jaegertracing/jaeger/cmd/env"
29
30
"github.com/jaegertracing/jaeger/cmd/flags"
30
31
"github.com/jaegertracing/jaeger/cmd/ingester/app"
31
- spanconsumer "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
32
+ "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer"
32
33
"github.com/jaegertracing/jaeger/cmd/ingester/app/processor"
33
34
"github.com/jaegertracing/jaeger/pkg/config"
34
- "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
35
+ kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
35
36
pMetrics "github.com/jaegertracing/jaeger/pkg/metrics"
36
37
"github.com/jaegertracing/jaeger/pkg/version"
37
38
"github.com/jaegertracing/jaeger/plugin/storage"
@@ -92,7 +93,7 @@ func main() {
92
93
}
93
94
spanProcessor := processor .NewSpanProcessor (spParams )
94
95
95
- consumerConfig := consumer .Configuration {
96
+ consumerConfig := kafkaConsumer .Configuration {
96
97
Brokers : options .Brokers ,
97
98
Topic : options .Topic ,
98
99
GroupID : options .GroupID ,
@@ -102,39 +103,46 @@ func main() {
102
103
logger .Fatal ("Failed to create sarama consumer" , zap .Error (err ))
103
104
}
104
105
105
- factoryParams := spanconsumer .ProcessorFactoryParams {
106
+ factoryParams := consumer .ProcessorFactoryParams {
106
107
Topic : options .Topic ,
107
108
Parallelism : options .Parallelism ,
108
109
SaramaConsumer : saramaConsumer ,
109
110
BaseProcessor : spanProcessor ,
110
111
Logger : logger ,
111
112
Factory : metricsFactory ,
112
113
}
113
- processorFactory , err := spanconsumer .NewProcessorFactory (factoryParams )
114
+ processorFactory , err := consumer .NewProcessorFactory (factoryParams )
114
115
if err != nil {
115
116
logger .Fatal ("Failed to create processor factory" , zap .Error (err ))
116
117
}
117
118
118
- consumerParams := spanconsumer .Params {
119
+ consumerParams := consumer .Params {
119
120
InternalConsumer : saramaConsumer ,
120
121
ProcessorFactory : * processorFactory ,
121
122
Factory : metricsFactory ,
122
123
Logger : logger ,
123
124
}
124
- kafkaConsumer , err := spanconsumer .New (consumerParams )
125
+ spanConsumer , err := consumer .New (consumerParams )
125
126
if err != nil {
126
127
logger .Fatal ("Unable to set up consumer" , zap .Error (err ))
127
128
}
128
- kafkaConsumer .Start ()
129
+ spanConsumer .Start ()
129
130
130
131
hc .Ready ()
131
132
select {
132
133
case <- signalsChannel :
133
- err := kafkaConsumer .Close ()
134
+ logger .Info ("Jaeger Ingester is starting to close" )
135
+ err := spanConsumer .Close ()
134
136
if err != nil {
135
- logger .Error ("Failed to close span writer " , zap .Error (err ))
137
+ logger .Error ("Failed to close consumer " , zap .Error (err ))
136
138
}
137
- logger .Info ("Jaeger Ingester is finishing" )
139
+ if closer , ok := spanWriter .(io.Closer ); ok {
140
+ err := closer .Close ()
141
+ if err != nil {
142
+ logger .Error ("Failed to close span writer" , zap .Error (err ))
143
+ }
144
+ }
145
+ logger .Info ("Jaeger Ingester has finished closing" )
138
146
}
139
147
return nil
140
148
},
0 commit comments