|
1 |
| -/* |
2 |
| - * Copyright (c) 2017-2018 Uber Technologies, Inc. |
3 |
| - * |
4 |
| - * Licensed under the Apache License, Version 2.0 (the "License"); |
5 |
| - * you may not use this file except in compliance with the License. |
6 |
| - * You may obtain a copy of the License at |
7 |
| - * |
8 |
| - * http://www.apache.org/licenses/LICENSE-2.0 |
9 |
| - * |
10 |
| - * Unless required by applicable law or agreed to in writing, software |
11 |
| - * distributed under the License is distributed on an "AS IS" BASIS, |
12 |
| - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 |
| - * See the License for the specific language governing permissions and |
14 |
| - * limitations under the License. |
15 |
| - */ |
16 |
| - |
17 |
| -#include "jaegertracing/UDPTransport.h" |
18 |
| - |
19 |
| -#include "jaegertracing/Span.h" |
20 |
| -#include "jaegertracing/Tag.h" |
21 |
| -#include "jaegertracing/Tracer.h" |
22 |
| -#include <algorithm> |
23 |
| -#include <cstdint> |
24 |
| -#include <iostream> |
25 |
| -#include <iterator> |
26 |
| -#include <string> |
27 |
| -#include <thrift/protocol/TCompactProtocol.h> |
28 |
| -#include <thrift/transport/TBufferTransports.h> |
29 |
| - |
30 |
| -#ifdef _MSC_VER |
31 |
| -#pragma warning(disable : 4267) // Conversion from unsigned to signed. It should not be a problem here. |
32 |
| -#endif |
33 |
| - |
34 |
| -namespace jaegertracing { |
35 |
| -namespace net { |
36 |
| -class IPAddress; |
37 |
| -} // namespace net |
38 |
| - |
39 |
| -namespace { |
40 |
| - |
41 |
| -constexpr auto kEmitBatchOverhead = 30; |
42 |
| - |
43 |
| -template <typename ThriftType> |
44 |
| -int calcSizeOfSerializedThrift(const ThriftType& base, int maxPacketSize) |
45 |
| -{ |
46 |
| - std::shared_ptr<apache::thrift::transport::TMemoryBuffer> buffer( |
47 |
| - new apache::thrift::transport::TMemoryBuffer(maxPacketSize)); |
48 |
| - apache::thrift::protocol::TCompactProtocolFactory factory; |
49 |
| - auto protocol = factory.getProtocol(buffer); |
50 |
| - base.write(protocol.get()); |
51 |
| - uint8_t* data = nullptr; |
52 |
| - uint32_t size = 0; |
53 |
| - buffer->getBuffer(&data, &size); |
54 |
| - return size; |
55 |
| -} |
56 |
| - |
57 |
| -} // anonymous namespace |
58 |
| - |
59 |
| -UDPTransport::UDPTransport(const net::IPAddress& ip, int maxPacketSize) |
60 |
| - : _client(new utils::UDPClient(ip, maxPacketSize)) |
61 |
| - , _maxSpanBytes(0) |
62 |
| - , _byteBufferSize(0) |
63 |
| - , _processByteSize(0) |
64 |
| -{ |
65 |
| -} |
66 |
| - |
67 |
| -int UDPTransport::append(const Span& span) |
68 |
| -{ |
69 |
| - if (_process.serviceName.empty()) { |
70 |
| - const auto& tracer = static_cast<const Tracer&>(span.tracer()); |
71 |
| - _process.serviceName = tracer.serviceName(); |
72 |
| - |
73 |
| - const auto& tracerTags = tracer.tags(); |
74 |
| - std::vector<thrift::Tag> thriftTags; |
75 |
| - thriftTags.reserve(tracerTags.size()); |
76 |
| - std::transform(std::begin(tracerTags), |
77 |
| - std::end(tracerTags), |
78 |
| - std::back_inserter(thriftTags), |
79 |
| - [](const Tag& tag) { return tag.thrift(); }); |
80 |
| - _process.__set_tags(thriftTags); |
81 |
| - |
82 |
| - _processByteSize = |
83 |
| - calcSizeOfSerializedThrift(_process, _client->maxPacketSize()); |
84 |
| - _maxSpanBytes = |
85 |
| - _client->maxPacketSize() - _processByteSize - kEmitBatchOverhead; |
86 |
| - } |
87 |
| - const auto jaegerSpan = span.thrift(); |
88 |
| - const auto spanSize = |
89 |
| - calcSizeOfSerializedThrift(jaegerSpan, _client->maxPacketSize()); |
90 |
| - if (spanSize > _maxSpanBytes) { |
91 |
| - std::ostringstream oss; |
92 |
| - throw Transport::Exception("Span is too large", 1); |
93 |
| - } |
94 |
| - |
95 |
| - _byteBufferSize += spanSize; |
96 |
| - if (_byteBufferSize <= _maxSpanBytes) { |
97 |
| - _spanBuffer.push_back(jaegerSpan); |
98 |
| - if (_byteBufferSize < _maxSpanBytes) { |
99 |
| - return 0; |
100 |
| - } |
101 |
| - return flush(); |
102 |
| - } |
103 |
| - |
104 |
| - // Flush currently full buffer, then append this span to buffer. |
105 |
| - const auto flushed = flush(); |
106 |
| - _spanBuffer.push_back(jaegerSpan); |
107 |
| - _byteBufferSize = spanSize + _processByteSize; |
108 |
| - return flushed; |
109 |
| -} |
110 |
| - |
111 |
| -int UDPTransport::flush() |
112 |
| -{ |
113 |
| - if (_spanBuffer.empty()) { |
114 |
| - return 0; |
115 |
| - } |
116 |
| - |
117 |
| - thrift::Batch batch; |
118 |
| - batch.__set_process(_process); |
119 |
| - batch.__set_spans(_spanBuffer); |
120 |
| - |
121 |
| - try { |
122 |
| - _client->emitBatch(batch); |
123 |
| - } catch (const std::system_error& ex) { |
124 |
| - std::ostringstream oss; |
125 |
| - oss << "Could not send span " << ex.what() |
126 |
| - << ", code=" << ex.code().value(); |
127 |
| - throw Transport::Exception(oss.str(), _spanBuffer.size()); |
128 |
| - } catch (const std::exception& ex) { |
129 |
| - std::ostringstream oss; |
130 |
| - oss << "Could not send span " << ex.what(); |
131 |
| - throw Transport::Exception(oss.str(), _spanBuffer.size()); |
132 |
| - } catch (...) { |
133 |
| - throw Transport::Exception("Could not send span, unknown error", |
134 |
| - _spanBuffer.size()); |
135 |
| - } |
136 |
| - |
137 |
| - resetBuffers(); |
138 |
| - |
139 |
| - return batch.spans.size(); |
140 |
| -} |
141 |
| - |
142 |
| -} // namespace jaegertracing |
| 1 | +/* |
| 2 | + * Copyright (c) 2017-2018 Uber Technologies, Inc. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +#include "jaegertracing/ThriftTransport.h" |
| 18 | + |
| 19 | +#include "jaegertracing/Span.h" |
| 20 | +#include "jaegertracing/Tag.h" |
| 21 | +#include "jaegertracing/Tracer.h" |
| 22 | +#include <algorithm> |
| 23 | +#include <cstdint> |
| 24 | +#include <iostream> |
| 25 | +#include <iterator> |
| 26 | +#include <thrift/transport/TBufferTransports.h> |
| 27 | + |
| 28 | +#ifdef _MSC_VER |
| 29 | +#pragma warning(disable : 4267) // Conversion from unsigned to signed. It should not be a problem here. |
| 30 | +#endif |
| 31 | + |
| 32 | +namespace jaegertracing { |
| 33 | +namespace net { |
| 34 | +class IPAddress; |
| 35 | +} // namespace net |
| 36 | + |
| 37 | +namespace { |
| 38 | + |
| 39 | +constexpr auto kEmitBatchOverhead = 30; |
| 40 | + |
| 41 | +template <typename ThriftType> |
| 42 | +int calcSizeOfSerializedThrift(apache::thrift::protocol::TProtocolFactory& factory, const ThriftType& base, int maxPacketSize) |
| 43 | +{ |
| 44 | + std::shared_ptr<apache::thrift::transport::TMemoryBuffer> buffer( |
| 45 | + new apache::thrift::transport::TMemoryBuffer(maxPacketSize)); |
| 46 | + auto protocol = factory.getProtocol(buffer); |
| 47 | + base.write(protocol.get()); |
| 48 | + uint8_t* data = nullptr; |
| 49 | + uint32_t size = 0; |
| 50 | + buffer->getBuffer(&data, &size); |
| 51 | + return size; |
| 52 | +} |
| 53 | + |
| 54 | +} // anonymous namespace |
| 55 | + |
| 56 | +ThriftTransport::ThriftTransport(const net::IPAddress& ip, |
| 57 | + int maxPacketSize) |
| 58 | + : _sender(new utils::UDPSender(ip, maxPacketSize)) |
| 59 | + , _maxSpanBytes(0) |
| 60 | + , _byteBufferSize(0) |
| 61 | + , _processByteSize(0) |
| 62 | + , _protocolFactory(_sender->protocolFactory()) |
| 63 | +{ |
| 64 | +} |
| 65 | + |
| 66 | +ThriftTransport::ThriftTransport(const net::URI& endpoint, |
| 67 | + int maxPacketSize) |
| 68 | + : _sender(new utils::HttpSender(endpoint, maxPacketSize)) |
| 69 | + , _maxSpanBytes(0) |
| 70 | + , _byteBufferSize(0) |
| 71 | + , _processByteSize(0) |
| 72 | + , _protocolFactory(_sender->protocolFactory()) |
| 73 | +{ |
| 74 | +} |
| 75 | + |
| 76 | + |
| 77 | +int ThriftTransport::append(const Span& span) |
| 78 | +{ |
| 79 | + if (_process.serviceName.empty()) { |
| 80 | + const auto& tracer = static_cast<const Tracer&>(span.tracer()); |
| 81 | + _process.serviceName = tracer.serviceName(); |
| 82 | + |
| 83 | + const auto& tracerTags = tracer.tags(); |
| 84 | + std::vector<thrift::Tag> thriftTags; |
| 85 | + thriftTags.reserve(tracerTags.size()); |
| 86 | + std::transform(std::begin(tracerTags), |
| 87 | + std::end(tracerTags), |
| 88 | + std::back_inserter(thriftTags), |
| 89 | + [](const Tag& tag) { return tag.thrift(); }); |
| 90 | + _process.__set_tags(thriftTags); |
| 91 | + |
| 92 | + _processByteSize = |
| 93 | + calcSizeOfSerializedThrift(*_protocolFactory, _process, _sender->maxPacketSize()); |
| 94 | + _maxSpanBytes = |
| 95 | + _sender->maxPacketSize() - _processByteSize - kEmitBatchOverhead; |
| 96 | + } |
| 97 | + const auto jaegerSpan = span.thrift(); |
| 98 | + const auto spanSize = |
| 99 | + calcSizeOfSerializedThrift(*_protocolFactory, jaegerSpan, _sender->maxPacketSize()); |
| 100 | + if (spanSize > _maxSpanBytes) { |
| 101 | + std::ostringstream oss; |
| 102 | + throw Transport::Exception("Span is too large", 1); |
| 103 | + } |
| 104 | + |
| 105 | + _byteBufferSize += spanSize; |
| 106 | + if (_byteBufferSize <= _maxSpanBytes) { |
| 107 | + _spanBuffer.push_back(jaegerSpan); |
| 108 | + if (_byteBufferSize < _maxSpanBytes) { |
| 109 | + return 0; |
| 110 | + } |
| 111 | + return flush(); |
| 112 | + } |
| 113 | + |
| 114 | + // Flush currently full buffer, then append this span to buffer. |
| 115 | + const auto flushed = flush(); |
| 116 | + _spanBuffer.push_back(jaegerSpan); |
| 117 | + _byteBufferSize = spanSize + _processByteSize; |
| 118 | + return flushed; |
| 119 | +} |
| 120 | + |
| 121 | +int ThriftTransport::flush() |
| 122 | +{ |
| 123 | + if (_spanBuffer.empty()) { |
| 124 | + return 0; |
| 125 | + } |
| 126 | + |
| 127 | + thrift::Batch batch; |
| 128 | + batch.__set_process(_process); |
| 129 | + batch.__set_spans(_spanBuffer); |
| 130 | + |
| 131 | + try { |
| 132 | + _sender->emitBatch(batch); |
| 133 | + } catch (const std::system_error& ex) { |
| 134 | + std::ostringstream oss; |
| 135 | + oss << "Could not send span " << ex.what() |
| 136 | + << ", code=" << ex.code().value(); |
| 137 | + throw Transport::Exception(oss.str(), _spanBuffer.size()); |
| 138 | + } catch (const std::exception& ex) { |
| 139 | + std::ostringstream oss; |
| 140 | + oss << "Could not send span " << ex.what(); |
| 141 | + throw Transport::Exception(oss.str(), _spanBuffer.size()); |
| 142 | + } catch (...) { |
| 143 | + throw Transport::Exception("Could not send span, unknown error", |
| 144 | + _spanBuffer.size()); |
| 145 | + } |
| 146 | + |
| 147 | + resetBuffers(); |
| 148 | + |
| 149 | + return batch.spans.size(); |
| 150 | +} |
| 151 | + |
| 152 | +} // namespace jaegertracing |
0 commit comments