@@ -17,49 +17,58 @@ package zipkin
17
17
import (
18
18
"compress/gzip"
19
19
"fmt"
20
+ "io"
20
21
"io/ioutil"
21
22
"net/http"
22
23
"strings"
23
24
"time"
24
25
25
26
"github.com/apache/thrift/lib/go/thrift"
27
+ "github.com/go-openapi/loads"
28
+ "github.com/go-openapi/strfmt"
29
+ "github.com/go-openapi/swag"
26
30
"github.com/gorilla/mux"
27
31
tchanThrift "github.com/uber/tchannel-go/thrift"
28
32
29
33
"github.com/jaegertracing/jaeger/cmd/collector/app"
34
+ "github.com/jaegertracing/jaeger/swagger-gen/models"
35
+ "github.com/jaegertracing/jaeger/swagger-gen/restapi"
36
+ "github.com/jaegertracing/jaeger/swagger-gen/restapi/operations"
30
37
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
31
38
)
32
39
33
40
// APIHandler handles all HTTP calls to the collector
34
41
type APIHandler struct {
35
42
zipkinSpansHandler app.ZipkinSpansHandler
43
+ zipkinV2Formats strfmt.Registry
36
44
}
37
45
38
46
// NewAPIHandler returns a new APIHandler
39
47
func NewAPIHandler (
40
48
zipkinSpansHandler app.ZipkinSpansHandler ,
41
- ) * APIHandler {
49
+ ) (* APIHandler , error ) {
50
+ swaggerSpec , _ := loads .Analyzed (restapi .SwaggerJSON , "" )
42
51
return & APIHandler {
43
52
zipkinSpansHandler : zipkinSpansHandler ,
44
- }
53
+ zipkinV2Formats : operations .NewZipkinAPI (swaggerSpec ).Formats (),
54
+ }, nil
45
55
}
46
56
47
57
// RegisterRoutes registers Zipkin routes
48
58
func (aH * APIHandler ) RegisterRoutes (router * mux.Router ) {
49
59
router .HandleFunc ("/api/v1/spans" , aH .saveSpans ).Methods (http .MethodPost )
60
+ router .HandleFunc ("/api/v2/spans" , aH .saveSpansV2 ).Methods (http .MethodPost )
50
61
}
51
62
52
63
func (aH * APIHandler ) saveSpans (w http.ResponseWriter , r * http.Request ) {
53
64
bRead := r .Body
54
65
defer r .Body .Close ()
55
-
56
66
if strings .Contains (r .Header .Get ("Content-Encoding" ), "gzip" ) {
57
- gz , err := gzip . NewReader ( r . Body )
67
+ gz , err := gunzip ( bRead )
58
68
if err != nil {
59
69
http .Error (w , fmt .Sprintf (app .UnableToReadBodyErrFormat , err ), http .StatusBadRequest )
60
70
return
61
71
}
62
- defer gz .Close ()
63
72
bRead = gz
64
73
}
65
74
@@ -84,15 +93,73 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
84
93
return
85
94
}
86
95
87
- if len (tSpans ) > 0 {
88
- ctx , _ := tchanThrift .NewContext (time .Minute )
89
- if _ , err = aH .zipkinSpansHandler .SubmitZipkinBatch (ctx , tSpans ); err != nil {
90
- http .Error (w , fmt .Sprintf ("Cannot submit Zipkin batch: %v" , err ), http .StatusInternalServerError )
96
+ if err := aH .saveThriftSpans (tSpans ); err != nil {
97
+ http .Error (w , fmt .Sprintf ("Cannot submit Zipkin batch: %v" , err ), http .StatusInternalServerError )
98
+ return
99
+ }
100
+
101
+ w .WriteHeader (http .StatusAccepted )
102
+ }
103
+
104
+ func (aH * APIHandler ) saveSpansV2 (w http.ResponseWriter , r * http.Request ) {
105
+ bRead := r .Body
106
+ defer r .Body .Close ()
107
+ if strings .Contains (r .Header .Get ("Content-Encoding" ), "gzip" ) {
108
+ gz , err := gunzip (bRead )
109
+ if err != nil {
110
+ http .Error (w , fmt .Sprintf (app .UnableToReadBodyErrFormat , err ), http .StatusBadRequest )
91
111
return
92
112
}
113
+ bRead = gz
93
114
}
94
115
95
- w .WriteHeader (http .StatusAccepted )
116
+ bodyBytes , err := ioutil .ReadAll (bRead )
117
+ if err != nil {
118
+ http .Error (w , fmt .Sprintf (app .UnableToReadBodyErrFormat , err ), http .StatusInternalServerError )
119
+ return
120
+ }
121
+
122
+ var spans models.ListOfSpans
123
+ if err = swag .ReadJSON (bodyBytes , & spans ); err != nil {
124
+ http .Error (w , fmt .Sprintf (app .UnableToReadBodyErrFormat , err ), http .StatusBadRequest )
125
+ return
126
+ }
127
+ if err = spans .Validate (aH .zipkinV2Formats ); err != nil {
128
+ http .Error (w , fmt .Sprintf (app .UnableToReadBodyErrFormat , err ), http .StatusBadRequest )
129
+ return
130
+ }
131
+
132
+ tSpans , err := spansV2ToThrift (& spans )
133
+ if err != nil {
134
+ http .Error (w , fmt .Sprintf (app .UnableToReadBodyErrFormat , err ), http .StatusBadRequest )
135
+ return
136
+ }
137
+
138
+ if err := aH .saveThriftSpans (tSpans ); err != nil {
139
+ http .Error (w , fmt .Sprintf ("Cannot submit Zipkin batch: %v" , err ), http .StatusInternalServerError )
140
+ return
141
+ }
142
+
143
+ w .WriteHeader (operations .PostSpansAcceptedCode )
144
+ }
145
+
146
+ func gunzip (r io.ReadCloser ) (* gzip.Reader , error ) {
147
+ gz , err := gzip .NewReader (r )
148
+ if err != nil {
149
+ return nil , err
150
+ }
151
+ defer gz .Close ()
152
+ return gz , nil
153
+ }
154
+
155
+ func (aH * APIHandler ) saveThriftSpans (tSpans []* zipkincore.Span ) error {
156
+ if len (tSpans ) > 0 {
157
+ ctx , _ := tchanThrift .NewContext (time .Minute )
158
+ if _ , err := aH .zipkinSpansHandler .SubmitZipkinBatch (ctx , tSpans ); err != nil {
159
+ return err
160
+ }
161
+ }
162
+ return nil
96
163
}
97
164
98
165
func deserializeThrift (b []byte ) ([]* zipkincore.Span , error ) {
0 commit comments