Skip to content

Commit 91e99be

Browse files
committed
Make xdock work
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
1 parent 4b12db5 commit 91e99be

File tree

7 files changed

+118
-37
lines changed

7 files changed

+118
-37
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
PROJECT_ROOT=github.com/uber/jaeger
2-
TOP_PKGS := $(shell glide novendor | grep -v -e ./thrift-gen/... -e ./examples/... -e ./scripts/...)
2+
TOP_PKGS := $(shell glide novendor | grep -v -e ./thrift-gen/... -e swagger-gen... -e ./examples/... -e ./scripts/...)
33

44
# all .go files that don't exist in hidden directories
55
ALL_SRC := $(shell find . -name "*.go" | grep -v -e vendor -e thrift-gen -e swagger-gen \

cmd/collector/app/zipkin/http_handler.go

+22-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package zipkin
1717
import (
1818
"compress/gzip"
1919
"fmt"
20+
"io"
2021
"io/ioutil"
2122
"net/http"
2223
"strings"
@@ -65,14 +66,12 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
6566
func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
6667
bRead := r.Body
6768
defer r.Body.Close()
68-
6969
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
70-
gz, err := gzip.NewReader(r.Body)
70+
gz, err := gunzip(bRead)
7171
if err != nil {
7272
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
7373
return
7474
}
75-
defer gz.Close()
7675
bRead = gz
7776
}
7877

@@ -108,15 +107,23 @@ func (aH *APIHandler) saveSpans(w http.ResponseWriter, r *http.Request) {
108107
func (aH *APIHandler) saveSpansV2(w http.ResponseWriter, r *http.Request) {
109108
bRead := r.Body
110109
defer r.Body.Close()
110+
if strings.Contains(r.Header.Get("Content-Encoding"), "gzip") {
111+
gz, err := gunzip(bRead)
112+
if err != nil {
113+
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
114+
return
115+
}
116+
bRead = gz
117+
}
111118

112119
bodyBytes, err := ioutil.ReadAll(bRead)
113120
if err != nil {
114121
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusInternalServerError)
115122
return
116123
}
117124

118-
var spans *models.ListOfSpans = &models.ListOfSpans{}
119-
if err = swag.ReadJSON(bodyBytes, spans); err != nil {
125+
var spans models.ListOfSpans
126+
if err = swag.ReadJSON(bodyBytes, &spans); err != nil {
120127
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
121128
return
122129
}
@@ -125,7 +132,7 @@ func (aH *APIHandler) saveSpansV2(w http.ResponseWriter, r *http.Request) {
125132
return
126133
}
127134

128-
tSpans, err := spansV2ToThrift(spans)
135+
tSpans, err := spansV2ToThrift(&spans)
129136
if err != nil {
130137
http.Error(w, fmt.Sprintf(app.UnableToReadBodyErrFormat, err), http.StatusBadRequest)
131138
return
@@ -139,6 +146,15 @@ func (aH *APIHandler) saveSpansV2(w http.ResponseWriter, r *http.Request) {
139146
w.WriteHeader(operations.PostSpansAcceptedCode)
140147
}
141148

149+
func gunzip(r io.ReadCloser) (*gzip.Reader, error) {
150+
gz, err := gzip.NewReader(r)
151+
if err != nil {
152+
return nil, err
153+
}
154+
defer gz.Close()
155+
return gz, nil
156+
}
157+
142158
func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error {
143159
if len(tSpans) > 0 {
144160
ctx, _ := tchanThrift.NewContext(time.Minute)

cmd/collector/app/zipkin/json.go

+15-13
Original file line numberDiff line numberDiff line change
@@ -152,32 +152,34 @@ func cutLongID(id string) string {
152152
return id
153153
}
154154

155-
func endpointToThrift(e endpoint) (*zipkincore.Endpoint, error) {
156-
ipv4, err := parseIpv4(e.IPv4)
155+
func endpointToThrift(ip4 string, ip6 string, p int32, service string) (*zipkincore.Endpoint, error) {
156+
ipv4, err := parseIpv4(ip4)
157157
if err != nil {
158158
return nil, err
159159
}
160-
port := e.Port
161-
if port >= (1 << 15) {
162-
// Zipkin.thrift defines port as i16, so values between (2^15 and 2^16-1) must be encoded as negative
163-
port = port - (1 << 16)
164-
}
165-
166-
ipv6, err := parseIpv6(e.IPv6)
160+
port := port(p)
161+
ipv6, err := parseIpv6(string(ip6))
167162
if err != nil {
168163
return nil, err
169164
}
170-
171165
return &zipkincore.Endpoint{
172-
ServiceName: e.ServiceName,
166+
ServiceName: service,
173167
Port: int16(port),
174168
Ipv4: ipv4,
175169
Ipv6: ipv6,
176170
}, nil
177171
}
178172

173+
func port(p int32) int32 {
174+
if p >= (1 << 15) {
175+
// Zipkin.thrift defines port as i16, so values between (2^15 and 2^16-1) must be encoded as negative
176+
p = p - (1 << 16)
177+
}
178+
return p
179+
}
180+
179181
func annoToThrift(a annotation) (*zipkincore.Annotation, error) {
180-
endpoint, err := endpointToThrift(a.Endpoint)
182+
endpoint, err := endpointToThrift(a.Endpoint.IPv4, a.Endpoint.IPv6, a.Endpoint.Port, a.Endpoint.ServiceName)
181183
if err != nil {
182184
return nil, err
183185
}
@@ -190,7 +192,7 @@ func annoToThrift(a annotation) (*zipkincore.Annotation, error) {
190192
}
191193

192194
func binAnnoToThrift(ba binaryAnnotation) (*zipkincore.BinaryAnnotation, error) {
193-
endpoint, err := endpointToThrift(ba.Endpoint)
195+
endpoint, err := endpointToThrift(ba.Endpoint.IPv4, ba.Endpoint.IPv6, ba.Endpoint.Port, ba.Endpoint.ServiceName)
194196
if err != nil {
195197
return nil, err
196198
}

cmd/collector/app/zipkin/json_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func TestEndpointToThrift(t *testing.T) {
217217
Port: 80,
218218
IPv4: "127.0.0.1",
219219
}
220-
tEndpoint, err := endpointToThrift(endp)
220+
tEndpoint, err := endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
221221
require.NoError(t, err)
222222
assert.Equal(t, "foo", tEndpoint.ServiceName)
223223
assert.Equal(t, int16(80), tEndpoint.Port)
@@ -228,7 +228,7 @@ func TestEndpointToThrift(t *testing.T) {
228228
Port: 80,
229229
IPv4: "",
230230
}
231-
tEndpoint, err = endpointToThrift(endp)
231+
tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
232232
require.NoError(t, err)
233233
assert.Equal(t, "foo", tEndpoint.ServiceName)
234234
assert.Equal(t, int16(80), tEndpoint.Port)
@@ -239,7 +239,7 @@ func TestEndpointToThrift(t *testing.T) {
239239
Port: 80,
240240
IPv4: "127.0.0.A",
241241
}
242-
tEndpoint, err = endpointToThrift(endp)
242+
tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
243243
require.Error(t, err)
244244
assert.Equal(t, errWrongIpv4, err)
245245
assert.Nil(t, tEndpoint)
@@ -249,7 +249,7 @@ func TestEndpointToThrift(t *testing.T) {
249249
Port: 80,
250250
IPv6: "::R",
251251
}
252-
tEndpoint, err = endpointToThrift(endp)
252+
tEndpoint, err = endpointToThrift(endp.IPv4, endp.IPv6, endp.Port, endp.ServiceName)
253253
require.Error(t, err)
254254
assert.Equal(t, errWrongIpv6, err)
255255
assert.Nil(t, tEndpoint)

cmd/collector/app/zipkin/jsonv2.go

+42-11
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package zipkin
1616

1717
import (
18+
"fmt"
1819
"github.com/uber/jaeger/model"
1920
"github.com/uber/jaeger/swagger-gen/models"
2021
"github.com/uber/jaeger/thrift-gen/zipkincore"
@@ -60,11 +61,16 @@ func spanV2ToThrift(s models.Span) (*zipkincore.Span, error) {
6061
tSpan.ParentID = &signed
6162
}
6263

64+
localE, err := endpointToThrift(string(s.LocalEndpoint.IPV4),
65+
string(s.LocalEndpoint.IPV6),
66+
int32(s.LocalEndpoint.Port),
67+
s.LocalEndpoint.ServiceName)
68+
if err != nil {
69+
return nil, err
70+
}
71+
6372
for _, a := range s.Annotations {
64-
tA, err := annToThrift(a)
65-
if err != nil {
66-
return nil, err
67-
}
73+
tA := annToThrift(a, *localE)
6874
tSpan.Annotations = append(tSpan.Annotations, tA)
6975
}
7076

@@ -73,25 +79,50 @@ func spanV2ToThrift(s models.Span) (*zipkincore.Span, error) {
7379
Key: k,
7480
Value: []byte(v),
7581
AnnotationType: zipkincore.AnnotationType_STRING,
76-
// TODO endpoint
82+
Host: localE,
7783
}
7884
tSpan.BinaryAnnotations = append(tSpan.BinaryAnnotations, ba)
7985
}
8086

87+
if s.Kind == models.SpanKindCLIENT {
88+
tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{
89+
Value: zipkincore.CLIENT_SEND,
90+
Host: localE,
91+
Timestamp: s.Timestamp,
92+
})
93+
tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{
94+
Value: zipkincore.CLIENT_RECV,
95+
Host: localE,
96+
Timestamp: s.Timestamp + s.Duration,
97+
})
98+
} else if s.Kind == models.SpanKindSERVER {
99+
tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{
100+
Value: zipkincore.SERVER_RECV,
101+
Host: localE,
102+
Timestamp: s.Timestamp,
103+
})
104+
tSpan.Annotations = append(tSpan.Annotations, &zipkincore.Annotation{
105+
Value: zipkincore.SERVER_SEND,
106+
Host: localE,
107+
Timestamp: s.Timestamp + s.Duration,
108+
})
109+
}
110+
// TODO consumer producer
111+
81112
// TODO
82-
//s.Kind
83-
//s.LocalEndpoint
84113
//s.RemoteEndpoint
85114

115+
fmt.Println("V2 span")
116+
fmt.Println(tSpan)
117+
86118
return tSpan, nil
87119
}
88120

89-
func annToThrift(a *models.Annotation) (*zipkincore.Annotation, error) {
121+
func annToThrift(a *models.Annotation, e zipkincore.Endpoint) *zipkincore.Annotation {
90122
ta := &zipkincore.Annotation{
91123
Value: a.Value,
92124
Timestamp: a.Timestamp,
93-
// TODO host - endpoint
125+
Host: &e,
94126
}
95-
96-
return ta, nil
127+
return ta
97128
}
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package zipkin
2+
3+
import (
4+
"testing"
5+
//"github.com/go-openapi/loads"
6+
//"github.com/uber/jaeger/swagger-gen/restapi"
7+
"github.com/stretchr/testify/require"
8+
//"github.com/uber/jaeger/swagger-gen/restapi/operations"
9+
"fmt"
10+
"github.com/go-openapi/swag"
11+
"github.com/uber/jaeger/swagger-gen/models"
12+
)
13+
14+
func Test(t *testing.T) {
15+
json := `[{"traceId":"5a056a4b7553fe28700fc2d99e03c627","id":"700fc2d99e03c627","name":"hovadina","localEndpoint":{"serviceName":"foo","ipv4":"10.43.17.43"}}]`
16+
17+
//swaggerSpec, err := loads.Analyzed(restapi.SwaggerJSON, "")
18+
//require.NoError(t, err)
19+
//zipkinV2API := operations.NewZipkinAPI(swaggerSpec)
20+
21+
var spans []models.Span
22+
err := swag.ReadJSON([]byte(json), &spans)
23+
require.NoError(t, err)
24+
25+
fmt.Println(spans[0])
26+
27+
var spansType models.ListOfSpans
28+
err = swag.ReadJSON([]byte(json), &spansType)
29+
require.NoError(t, err)
30+
fmt.Println(spansType[0])
31+
}

crossdock/services/query.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ func (s *queryService) GetTraces(serviceName, operation string, tags map[string]
6464
for k, v := range tags {
6565
values.Add("tag", k+":"+v)
6666
}
67-
resp, err := http.Get(fmt.Sprintf(getTraceURL(s.url), values.Encode()))
67+
url := fmt.Sprintf(getTraceURL(s.url), values.Encode())
68+
resp, err := http.Get(url)
6869
if err != nil {
6970
return nil, err
7071
}
7172
defer resp.Body.Close()
7273
body, _ := ioutil.ReadAll(resp.Body)
73-
s.logger.Info("Retrieved trace from query", zap.String("body", string(body)))
74+
s.logger.Info("Retrieved trace from query", zap.String("body", string(body)), zap.String("url", url))
7475

7576
var queryResponse response
7677
if err = json.Unmarshal(body, &queryResponse); err != nil {

0 commit comments

Comments
 (0)