Skip to content

Commit ccb2cad

Browse files
committed
Collect Zipkin v2 json
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
1 parent a2ed9b8 commit ccb2cad

27 files changed

+1863
-44
lines changed

.gitmodules

+3
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@
44
[submodule "jaeger-ui"]
55
path = jaeger-ui
66
url = https://github.com/uber/jaeger-ui
7+
[submodule "zipkin-api"]
8+
path = zipkin-api
9+
url = https://github.com/openzipkin/zipkin-api

Makefile

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
PROJECT_ROOT=github.com/uber/jaeger
2-
TOP_PKGS := $(shell glide novendor | grep -v -e ./thrift-gen/... -e ./examples/... -e ./scripts/...)
2+
TOP_PKGS := $(shell glide novendor | grep -v -e ./thrift-gen/... -e swagger-gen... -e ./examples/... -e ./scripts/...)
33

44
# all .go files that don't exist in hidden directories
5-
ALL_SRC := $(shell find . -name "*.go" | grep -v -e vendor -e thrift-gen \
5+
ALL_SRC := $(shell find . -name "*.go" | grep -v -e vendor -e thrift-gen -e swagger-gen \
66
-e ".*/\..*" \
77
-e ".*/_.*" \
88
-e ".*/mocks.*")
@@ -34,6 +34,11 @@ THRIFT_GO_ARGS=thrift_import="github.com/apache/thrift/lib/go/thrift"
3434
THRIFT_GEN=$(shell which thrift-gen)
3535
THRIFT_GEN_DIR=thrift-gen
3636

37+
SWAGGER_IMG_VER=0.12.0
38+
SWAGGER_IMAGE=quay.io/goswagger/swagger:$(SWAGGER_IMG_VER)
39+
SWAGGER=docker run --rm -it -u ${shell id -u} -v "${PWD}:/go/src/${PROJECT_ROOT}" -w /go/src/${PROJECT_ROOT} $(SWAGGER_IMAGE)
40+
SWAGGER_GEN_DIR=swagger-gen
41+
3742
PASS=$(shell printf "\033[32mPASS\033[0m")
3843
FAIL=$(shell printf "\033[31mFAIL\033[0m")
3944
COLORIZE=$(SED) ''/PASS/s//$(PASS)/'' | $(SED) ''/FAIL/s//$(FAIL)/''
@@ -215,6 +220,11 @@ idl-submodule:
215220
git submodule init
216221
git submodule update
217222

223+
.PHONY: generate-zipkin-swagger
224+
generate-zipkin-swagger:
225+
$(SWAGGER) generate server -f ./zipkin-api/zipkin2-api.yaml -t $(SWAGGER_GEN_DIR) -O PostSpans --exclude-main
226+
rm $(SWAGGER_GEN_DIR)/restapi/operations/post_spans_urlbuilder.go $(SWAGGER_GEN_DIR)/restapi/server.go $(SWAGGER_GEN_DIR)/restapi/configure_zipkin.go $(SWAGGER_GEN_DIR)/models/trace.go $(SWAGGER_GEN_DIR)/models/list_of_traces.go $(SWAGGER_GEN_DIR)/models/dependency_link.go
227+
218228
.PHONY: thrift-image
219229
thrift-image:
220230
$(THRIFT) -version

cmd/collector/app/zipkin/http_handler.go

+80-10
Original file line numberDiff line numberDiff line change
@@ -17,49 +17,61 @@ package zipkin
1717
import (
1818
"compress/gzip"
1919
"fmt"
20+
"io"
2021
"io/ioutil"
2122
"net/http"
2223
"strings"
2324
"time"
2425

2526
"github.com/apache/thrift/lib/go/thrift"
27+
"github.com/go-openapi/loads"
28+
"github.com/go-openapi/swag"
2629
"github.com/gorilla/mux"
30+
"github.com/pkg/errors"
2731
tchanThrift "github.com/uber/tchannel-go/thrift"
2832

2933
"github.com/uber/jaeger/cmd/collector/app"
34+
"github.com/uber/jaeger/swagger-gen/models"
35+
"github.com/uber/jaeger/swagger-gen/restapi"
36+
"github.com/uber/jaeger/swagger-gen/restapi/operations"
3037
"github.com/uber/jaeger/thrift-gen/zipkincore"
3138
)
3239

3340
// APIHandler handles all HTTP calls to the collector
3441
type APIHandler struct {
3542
zipkinSpansHandler app.ZipkinSpansHandler
43+
zipkinV2API *operations.ZipkinAPI
3644
}
3745

3846
// NewAPIHandler returns a new APIHandler
3947
func NewAPIHandler(
4048
zipkinSpansHandler app.ZipkinSpansHandler,
41-
) *APIHandler {
49+
) (*APIHandler, error) {
50+
swaggerSpec, err := loads.Analyzed(restapi.SwaggerJSON, "")
51+
if err != nil {
52+
return nil, errors.Wrapf(err, "Failed to create zipkin swagger")
53+
}
4254
return &APIHandler{
4355
zipkinSpansHandler: zipkinSpansHandler,
44-
}
56+
zipkinV2API: operations.NewZipkinAPI(swaggerSpec),
57+
}, nil
4558
}
4659

4760
// RegisterRoutes registers Zipkin routes
4861
func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
4962
router.HandleFunc("/api/v1/spans", aH.saveSpans).Methods(http.MethodPost)
63+
router.HandleFunc("/api/v2/spans", aH.saveSpansV2).Methods(http.MethodPost)
5064
}
5165

5266
func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
5367
bRead := r.Body
5468
defer r.Body.Close()
55-
5669
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
57-
gz, err := gzip.NewReader(r.Body)
70+
gz, err := gunzip(bRead)
5871
if err != nil {
5972
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
6073
return
6174
}
62-
defer gz.Close()
6375
bRead = gz
6476
}
6577

@@ -84,15 +96,73 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
8496
return
8597
}
8698

87-
if len(tSpans) > 0 {
88-
ctx, _ := tchanThrift.NewContext(time.Minute)
89-
if _, err = aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, tSpans); err != nil {
90-
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
99+
if err := aH.saveThriftSpans(tSpans); err != nil {
100+
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
101+
return
102+
}
103+
104+
w.WriteHeader(http.StatusAccepted)
105+
}
106+
107+
func (aH *APIHandler) saveSpansV2(w http.ResponseWriter, r *http.Request) {
108+
bRead := r.Body
109+
defer r.Body.Close()
110+
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
111+
gz, err := gunzip(bRead)
112+
if err != nil {
113+
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
91114
return
92115
}
116+
bRead = gz
93117
}
94118

95-
w.WriteHeader(http.StatusAccepted)
119+
bodyBytes, err := ioutil.ReadAll(bRead)
120+
if err != nil {
121+
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusInternalServerError)
122+
return
123+
}
124+
125+
var spans models.ListOfSpans
126+
if err = swag.ReadJSON(bodyBytes, &spans); err != nil {
127+
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
128+
return
129+
}
130+
if err = spans.Validate(aH.zipkinV2API.Formats()); err != nil {
131+
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
132+
return
133+
}
134+
135+
tSpans, err := spansV2ToThrift(&spans)
136+
if err != nil {
137+
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
138+
return
139+
}
140+
141+
if err := aH.saveThriftSpans(tSpans); err != nil {
142+
http.Error(w, fmt.Sprintf("Cannot submit Zipkin batch: %v", err), http.StatusInternalServerError)
143+
return
144+
}
145+
146+
w.WriteHeader(operations.PostSpansAcceptedCode)
147+
}
148+
149+
func gunzip(r io.ReadCloser) (*gzip.Reader, error) {
150+
gz, err := gzip.NewReader(r)
151+
if err != nil {
152+
return nil, err
153+
}
154+
defer gz.Close()
155+
return gz, nil
156+
}
157+
158+
func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error {
159+
if len(tSpans) > 0 {
160+
ctx, _ := tchanThrift.NewContext(time.Minute)
161+
if _, err := aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, tSpans); err != nil {
162+
return err
163+
}
164+
}
165+
return nil
96166
}
97167

98168
func deserializeThrift(b []byte) ([]*zipkincore.Span, error) {

cmd/collector/app/zipkin/http_handler_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (p *mockZipkinHandler) getSpans() []*zipkincore.Span {
5959

6060
func initializeTestServer(err error) (*httptest.Server, *APIHandler) {
6161
r := mux.NewRouter()
62-
handler := NewAPIHandler(&mockZipkinHandler{err: err})
62+
handler, _ := NewAPIHandler(&mockZipkinHandler{err: err})
6363
handler.RegisterRoutes(r)
6464
return httptest.NewServer(r), handler
6565
}
@@ -224,7 +224,8 @@ func TestDeserializeWithBadListStart(t *testing.T) {
224224
}
225225

226226
func TestCannotReadBodyFromRequest(t *testing.T) {
227-
handler := NewAPIHandler(&mockZipkinHandler{})
227+
handler, err := NewAPIHandler(&mockZipkinHandler{})
228+
require.NoError(t, err)
228229
req, err := http.NewRequest(http.MethodPost, "whatever", &errReader{})
229230
assert.NoError(t, err)
230231
rw := dummyResponseWriter{}

cmd/collector/app/zipkin/json.go

+15-13
Original file line numberDiff line numberDiff line change
@@ -152,32 +152,34 @@ func cutLongID(id string) string {
152152
return id
153153
}
154154

155-
func endpointToThrift(e endpoint) (*zipkincore.Endpoint, error) {
156-
ipv4, err := parseIpv4(e.IPv4)
155+
func endpointToThrift(ip4 string, ip6 string, p int32, service string) (*zipkincore.Endpoint, error) {
156+
ipv4, err := parseIpv4(ip4)
157157
if err != nil {
158158
return nil, err
159159
}
160-
port := e.Port
161-
if port >= (1 << 15) {
162-
// Zipkin.thrift defines port as i16, so values between (2^15 and 2^16-1) must be encoded as negative
163-
port = port - (1 << 16)
164-
}
165-
166-
ipv6, err := parseIpv6(e.IPv6)
160+
port := port(p)
161+
ipv6, err := parseIpv6(string(ip6))
167162
if err != nil {
168163
return nil, err
169164
}
170-
171165
return &zipkincore.Endpoint{
172-
ServiceName: e.ServiceName,
166+
ServiceName: service,
173167
Port: int16(port),
174168
Ipv4: ipv4,
175169
Ipv6: ipv6,
176170
}, nil
177171
}
178172

173+
func port(p int32) int32 {
174+
if p >= (1 << 15) {
175+
// Zipkin.thrift defines port as i16, so values between (2^15 and 2^16-1) must be encoded as negative
176+
p = p - (1 << 16)
177+
}
178+
return p
179+
}
180+
179181
func annoToThrift(a annotation) (*zipkincore.Annotation, error) {
180-
endpoint, err := endpointToThrift(a.Endpoint)
182+
endpoint, err := endpointToThrift(a.Endpoint.IPv4, a.Endpoint.IPv6, a.Endpoint.Port, a.Endpoint.ServiceName)
181183
if err != nil {
182184
return nil, err
183185
}
@@ -190,7 +192,7 @@ func annoToThrift(a annotation) (*zipkincore.Annotation, error) {
190192
}
191193

192194
func binAnnoToThrift(ba binaryAnnotation) (*zipkincore.BinaryAnnotation, error) {
193-
endpoint, err := endpointToThrift(ba.Endpoint)
195+
endpoint, err := endpointToThrift(ba.Endpoint.IPv4, ba.Endpoint.IPv6, ba.Endpoint.Port, ba.Endpoint.ServiceName)
194196
if err != nil {
195197
return nil, err
196198
}

cmd/collector/app/zipkin/json_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func TestEndpointToThrift(t *testing.T) {
217217
Port: 80,
218218
IPv4: "127.0.0.1",
219219
}
220-
tEndpoint, err := endpointToThrift(endp)
220+
tEndpoint, err := endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
221221
require.NoError(t, err)
222222
assert.Equal(t, "foo", tEndpoint.ServiceName)
223223
assert.Equal(t, int16(80), tEndpoint.Port)
@@ -228,7 +228,7 @@ func TestEndpointToThrift(t *testing.T) {
228228
Port: 80,
229229
IPv4: "",
230230
}
231-
tEndpoint, err = endpointToThrift(endp)
231+
tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
232232
require.NoError(t, err)
233233
assert.Equal(t, "foo", tEndpoint.ServiceName)
234234
assert.Equal(t, int16(80), tEndpoint.Port)
@@ -239,7 +239,7 @@ func TestEndpointToThrift(t *testing.T) {
239239
Port: 80,
240240
IPv4: "127.0.0.A",
241241
}
242-
tEndpoint, err = endpointToThrift(endp)
242+
tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
243243
require.Error(t, err)
244244
assert.Equal(t, errWrongIpv4, err)
245245
assert.Nil(t, tEndpoint)
@@ -249,7 +249,7 @@ func TestEndpointToThrift(t *testing.T) {
249249
Port: 80,
250250
IPv6: "::R",
251251
}
252-
tEndpoint, err = endpointToThrift(endp)
252+
tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
253253
require.Error(t, err)
254254
assert.Equal(t, errWrongIpv6, err)
255255
assert.Nil(t, tEndpoint)

0 commit comments

Comments
 (0)