Skip to content

Commit 9870dbe

Browse files
veerbiaandrewlockbouwkast
authored
Switch .NET tracer to injecting both base64 & binary headers (#6448)
## Summary of changes - Switch .NET tracer to injecting both base64 & binary headers - Binary headers can be disabled via a config variable: DD_DATA_STREAMS_LEGACY_HEADERS=false (defaults to true) - .NET tracer, when extracting the context from headers looks at both base64 & binary headers ## Reason for change Data Streams previously used binary encoding in Kafka headers. This was causing issues in cross language communication because of the difference in negative byte handling. That’s why we switched to base64 encoding. Today, .NET is the only remaining tracer using binary encoding for Kafka, SQS & RabbitMQ. This is causing 3 issues: - Communication between .NET & Java breaks, since Java doesn’t support negative bytes in Kafka headers - Manual instrumentation breaks between .NET and any other language, since manual instrumentation uses base64 encoding - .NET can’t consume payloads from any other tracers, since the other tracers encode in base64 Also, byte headers are causing a crash of the [.NET application](https://datadog.zendesk.com/agent/tickets/1824351) (not reproduced yet). ## Implementation details - Injects base64-encoded headers by default to ensure cross-language compatibility. - Binary headers are injected only if `DD_DATA_STREAMS_LEGACY_HEADERS` (default: true) is enabled for backward compatibility. - Prefers base64 headers for extraction. If base64 is unavailable or malformed, it falls back to binary headers when legacy headers are enabled. - A new configuration setting DD_DATA_STREAMS_LEGACY_HEADERS controls whether binary headers are included. ## Test coverage I've added tests for the following cases: - Inject_WhenLegacyHeadersDisabled_DoesNotIncludeBinaryHeader - Ensures that when legacy headers are disabled, the binary header is not included in the injected headers. - Extract_WhenBothHeadersPresent_PrefersBase64Header - Confirms that when both Base64 and binary headers are present, the Base64 header is preferred for extraction. - InjectedHeaders_HaveCorrectFormat - Validates that injected headers are in the correct format, especially checking if the Base64-encoded header is valid and correctly decoded. - InjectHeaders_WhenLegacyHeadersDisabled_DoesNotIncludeLegacyHeader - Ensures that legacy headers are excluded when legacy header support is disabled. - Inject_WhenLegacyHeadersEnabled_IncludesBothHeaders - Confirms that both Base64 and binary headers are injected when legacy header support is enabled. - Extract_WhenBase64HeaderIsMalformed_ReturnsFallbackToBinary - Verifies that if the Base64 header is malformed, the system falls back to using the binary header for extraction. ## Other details <!-- Fixes #{issue} --> <!-- ⚠️ Note: where possible, please obtain 2 approvals prior to merging. Unless CODEOWNERS specifies otherwise, for external teams it is typically best to have one review from a team member, and one review from apm-dotnet. Trivial changes do not require 2 reviews. --> --------- Co-authored-by: Andrew Lock <andrew.lock@datadoghq.com> Co-authored-by: Steven Bouwkamp <steven.bouwkamp@datadoghq.com>
1 parent 41d9038 commit 9870dbe

14 files changed

+418
-102
lines changed

tracer/src/Datadog.Trace/Configuration/ConfigurationKeys.cs

+7
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,13 @@ internal static class DataStreamsMonitoring
819819
/// </summary>
820820
/// <see cref="TracerSettings.IsDataStreamsMonitoringEnabled"/>
821821
public const string Enabled = "DD_DATA_STREAMS_ENABLED";
822+
823+
/// <summary>
824+
/// Configuration key for enabling legacy binary headers in Data Streams Monitoring.
825+
/// Default is true.
826+
/// </summary>
827+
/// <see cref="TracerSettings.IsDataStreamsLegacyHeadersEnabled"/>
828+
public const string LegacyHeadersEnabled = "DD_DATA_STREAMS_LEGACY_HEADERS";
822829
}
823830
}
824831
}

tracer/src/Datadog.Trace/Configuration/TracerSettings.cs

+9
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,10 @@ _ when x.ToBoolean() is { } boolean => boolean,
520520
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.Enabled)
521521
.AsBool(false);
522522

523+
IsDataStreamsLegacyHeadersEnabled = config
524+
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled)
525+
.AsBool(true);
526+
523527
IsRareSamplerEnabled = config
524528
.WithKeys(ConfigurationKeys.RareSamplerEnabled)
525529
.AsBool(false);
@@ -985,6 +989,11 @@ public bool DiagnosticSourceEnabled
985989
/// </summary>
986990
internal bool IsDataStreamsMonitoringEnabled => DynamicSettings.DataStreamsMonitoringEnabled ?? _isDataStreamsMonitoringEnabled;
987991

992+
/// <summary>
993+
/// Gets a value indicating whether to inject legacy binary headers for Data Streams.
994+
/// </summary>
995+
internal bool IsDataStreamsLegacyHeadersEnabled { get; }
996+
988997
/// <summary>
989998
/// Gets a value indicating whether the rare sampler is enabled or not.
990999
/// </summary>

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsContextPropagator.cs

+96-5
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
33
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
44
// </copyright>
5-
65
#nullable enable
7-
86
using System;
97
using System.Text;
108
using Datadog.Trace.Headers;
9+
using Datadog.Trace.Logging;
1110
using Datadog.Trace.Util;
11+
using Datadog.Trace.VendoredMicrosoftCode.System.Buffers;
12+
using Datadog.Trace.VendoredMicrosoftCode.System.Buffers.Text;
1213

1314
namespace Datadog.Trace.DataStreamsMonitoring;
1415

@@ -17,6 +18,8 @@ namespace Datadog.Trace.DataStreamsMonitoring;
1718
/// </summary>
1819
internal class DataStreamsContextPropagator
1920
{
21+
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<DataStreamsContextPropagator>();
22+
2023
public static DataStreamsContextPropagator Instance { get; } = new();
2124

2225
/// <summary>
@@ -28,10 +31,42 @@ internal class DataStreamsContextPropagator
2831
/// <typeparam name="TCarrier">Type of header collection</typeparam>
2932
public void Inject<TCarrier>(PathwayContext context, TCarrier headers)
3033
where TCarrier : IBinaryHeadersCollection
34+
=> Inject(context, headers, Tracer.Instance.Settings.IsDataStreamsLegacyHeadersEnabled);
35+
36+
// Internal for testing
37+
internal void Inject<TCarrier>(PathwayContext context, TCarrier headers, bool isDataStreamsLegacyHeadersEnabled)
38+
where TCarrier : IBinaryHeadersCollection
3139
{
3240
if (headers is null) { ThrowHelper.ThrowArgumentNullException(nameof(headers)); }
3341

34-
headers.Add(DataStreamsPropagationHeaders.PropagationKey, PathwayContextEncoder.Encode(context));
42+
var encodedBytes = PathwayContextEncoder.Encode(context);
43+
44+
// Calculate the maximum length of the base64 encoded data
45+
// Base64 encoding encodes 3 bytes of data into 4 bytes of encoded data
46+
// So the maximum length is ceil(encodedBytes.Length / 3) * 4 and using integer arithmetic it's ((encodedBytes.Length + 2) / 3) * 4
47+
int base64Length = ((encodedBytes.Length + 2) / 3) * 4;
48+
byte[] base64EncodedContextBytes = new byte[base64Length];
49+
var status = Base64.EncodeToUtf8(encodedBytes, base64EncodedContextBytes, out _, out int bytesWritten);
50+
51+
if (status != OperationStatus.Done)
52+
{
53+
Log.Error("Failed to encode Data Streams context to Base64. OperationStatus: {Status}", status);
54+
return;
55+
}
56+
57+
if (bytesWritten == base64EncodedContextBytes.Length)
58+
{
59+
headers.Add(DataStreamsPropagationHeaders.PropagationKeyBase64, base64EncodedContextBytes);
60+
}
61+
else
62+
{
63+
headers.Add(DataStreamsPropagationHeaders.PropagationKeyBase64, base64EncodedContextBytes.AsSpan(0, bytesWritten).ToArray());
64+
}
65+
66+
if (isDataStreamsLegacyHeadersEnabled)
67+
{
68+
headers.Add(DataStreamsPropagationHeaders.PropagationKey, encodedBytes);
69+
}
3570
}
3671

3772
/// <summary>
@@ -42,12 +77,68 @@ public void Inject<TCarrier>(PathwayContext context, TCarrier headers)
4277
/// <returns>A new <see cref="PathwayContext"/> that contains the values obtained from <paramref name="headers"/>.</returns>
4378
public PathwayContext? Extract<TCarrier>(TCarrier headers)
4479
where TCarrier : IBinaryHeadersCollection
80+
=> Extract(headers, Tracer.Instance.Settings.IsDataStreamsLegacyHeadersEnabled);
81+
82+
// internal for testing
83+
internal PathwayContext? Extract<TCarrier>(TCarrier headers, bool isDataStreamsLegacyHeadersEnabled)
84+
where TCarrier : IBinaryHeadersCollection
4585
{
4686
if (headers is null) { ThrowHelper.ThrowArgumentNullException(nameof(headers)); }
4787

48-
var bytes = headers.TryGetLastBytes(DataStreamsPropagationHeaders.PropagationKey);
88+
// Try to extract from the base64 header first
89+
var base64Bytes = headers.TryGetLastBytes(DataStreamsPropagationHeaders.PropagationKeyBase64);
90+
if (base64Bytes is { Length: > 0 })
91+
{
92+
try
93+
{
94+
// Calculate the maximum decoded length
95+
// Base64 encoding encodes 3 bytes of data into 4 bytes of encoded data
96+
// So the maximum decoded length is (base64Bytes.Length * 3) / 4
97+
int decodedLength = (base64Bytes.Length * 3) / 4;
98+
byte[] decodedBytes = new byte[decodedLength];
99+
100+
var status = Base64.DecodeFromUtf8(base64Bytes, decodedBytes, out _, out int bytesWritten);
101+
102+
if (status != OperationStatus.Done)
103+
{
104+
Log.Error("Failed to decode Base64 data streams context. OperationStatus: {Status}", status);
105+
return null;
106+
}
107+
else
108+
{
109+
if (bytesWritten == decodedBytes.Length)
110+
{
111+
return PathwayContextEncoder.Decode(decodedBytes);
112+
}
113+
else
114+
{
115+
return PathwayContextEncoder.Decode(decodedBytes.AsSpan(0, bytesWritten).ToArray());
116+
}
117+
}
118+
}
119+
catch (Exception ex)
120+
{
121+
Log.Error(ex, "Failed to decode base64 Data Streams context.");
122+
}
123+
}
49124

50-
return bytes is { } ? PathwayContextEncoder.Decode(bytes) : null;
125+
if (isDataStreamsLegacyHeadersEnabled)
126+
{
127+
var binaryBytes = headers.TryGetLastBytes(DataStreamsPropagationHeaders.PropagationKey);
128+
if (binaryBytes is { Length: > 0 })
129+
{
130+
try
131+
{
132+
return PathwayContextEncoder.Decode(binaryBytes);
133+
}
134+
catch (Exception ex)
135+
{
136+
Log.Error(ex, "Failed to decode binary Data Streams context.");
137+
}
138+
}
139+
}
140+
141+
return null;
51142
}
52143

53144
/// <summary>

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs

+2-11
Original file line numberDiff line numberDiff line change
@@ -83,21 +83,12 @@ public async Task DisposeAsync()
8383
public void InjectPathwayContext<TCarrier>(PathwayContext? context, TCarrier headers)
8484
where TCarrier : IBinaryHeadersCollection
8585
{
86-
if (!IsEnabled)
87-
{
88-
return;
89-
}
90-
91-
if (context is not null)
86+
if (!IsEnabled || context is null)
9287
{
93-
DataStreamsContextPropagator.Instance.Inject(context.Value, headers);
9488
return;
9589
}
9690

97-
// This shouldn't happen normally, as you should call SetCheckpoint before calling InjectPathwayContext
98-
// But if data streams was disabled, you call SetCheckpoint, and then data streams is enabled
99-
// you will hit this code path
100-
Log.Debug("Attempted to inject null pathway context");
91+
DataStreamsContextPropagator.Instance.Inject(context.Value, headers);
10192
}
10293

10394
public void TrackBacklog(string tags, long value)

tracer/src/Datadog.Trace/TracerManager.cs

+3
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,9 @@ void WriteDictionary(IReadOnlyDictionary<string, string> dictionary)
528528
writer.WritePropertyName("data_streams_enabled");
529529
writer.WriteValue(instanceSettings.IsDataStreamsMonitoringEnabled);
530530

531+
writer.WritePropertyName("data_streams_legacy_headers_enabled");
532+
writer.WriteValue(instanceSettings.IsDataStreamsLegacyHeadersEnabled);
533+
531534
writer.WritePropertyName("span_sampling_rules");
532535
writer.WriteValue(instanceSettings.SpanSamplingRules);
533536

tracer/test/Datadog.Trace.ClrProfiler.IntegrationTests/DataStreamsMonitoringKafkaTests.cs

+14-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// </copyright>
55

66
using System;
7+
using System.Collections.Generic;
78
using System.Threading.Tasks;
89
using Datadog.Trace.Configuration;
910
using Datadog.Trace.TestHelpers;
@@ -27,6 +28,14 @@ public DataStreamsMonitoringKafkaTests(ITestOutputHelper output)
2728
SetServiceVersion("1.0.0");
2829
}
2930

31+
public static IEnumerable<object[]> GetKafkaTestData()
32+
{
33+
yield return new object[] { true, true };
34+
yield return new object[] { true, false };
35+
yield return new object[] { false, true };
36+
yield return new object[] { false, false };
37+
}
38+
3039
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
3140
/// <summary>
3241
/// This sample does a series of produces and consumes to create two pipelines:
@@ -57,15 +66,16 @@ public DataStreamsMonitoringKafkaTests(ITestOutputHelper output)
5766
/// C2->>+T3: Produce
5867
/// </summary>
5968
/// <param name="enableConsumerScopeCreation">Is the scope created manually or using built-in support</param>
69+
/// <param name="enableLegacyHeaders">Should legacy headers be enabled?</param>
6070
[SkippableTheory]
61-
[InlineData(true)]
62-
[InlineData(false)]
71+
[MemberData(nameof(GetKafkaTestData))]
6372
[Trait("Category", "EndToEnd")]
6473
[Trait("Category", "ArmUnsupported")]
65-
public async Task SubmitsDataStreams(bool enableConsumerScopeCreation)
74+
public async Task SubmitsDataStreams(bool enableConsumerScopeCreation, bool enableLegacyHeaders)
6675
{
6776
SetEnvironmentVariable(ConfigurationKeys.DataStreamsMonitoring.Enabled, "1");
6877
SetEnvironmentVariable(ConfigurationKeys.KafkaCreateConsumerScopeEnabled, enableConsumerScopeCreation ? "1" : "0");
78+
SetEnvironmentVariable(ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled, enableLegacyHeaders ? "1" : "0");
6979

7080
using var agent = EnvironmentHelper.GetMockAgent(useTelemetry: true);
7181

@@ -100,6 +110,7 @@ public async Task HandlesBatchProcessing()
100110
SetEnvironmentVariable(ConfigurationKeys.DataStreamsMonitoring.Enabled, "1");
101111
// set variable to create short spans on receive instead of spans that last until the next consume
102112
SetEnvironmentVariable(ConfigurationKeys.KafkaCreateConsumerScopeEnabled, "0");
113+
SetEnvironmentVariable(ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled, "1");
103114

104115
using var agent = EnvironmentHelper.GetMockAgent(useTelemetry: true);
105116

tracer/test/Datadog.Trace.ClrProfiler.IntegrationTests/DataStreamsMonitoringRabbitMQTests.cs

+2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public DataStreamsMonitoringRabbitMQTests(ITestOutputHelper output)
3535
public async Task HandleProduceAndConsume(string packageVersion)
3636
{
3737
SetEnvironmentVariable(ConfigurationKeys.DataStreamsMonitoring.Enabled, "1");
38+
SetEnvironmentVariable(ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled, "1");
3839

3940
using var assertionScope = new AssertionScope();
4041
using var agent = EnvironmentHelper.GetMockAgent();
@@ -58,6 +59,7 @@ await Verifier.Verify(PayloadsToPoints(agent.DataStreams), settings)
5859
public async Task ValidateSpanTags(string packageVersion)
5960
{
6061
SetEnvironmentVariable(ConfigurationKeys.DataStreamsMonitoring.Enabled, "1");
62+
SetEnvironmentVariable(ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled, "1");
6163

6264
using var assertionScope = new AssertionScope();
6365
using var agent = EnvironmentHelper.GetMockAgent();

tracer/test/Datadog.Trace.Tests/Configuration/TracerSettingsTests.cs

+10
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,16 @@ public void IsDataStreamsMonitoringEnabled(string value, bool expected)
974974
settings.IsDataStreamsMonitoringEnabled.Should().Be(expected);
975975
}
976976

977+
[Theory]
978+
[MemberData(nameof(BooleanTestCases), true)]
979+
public void IsDataStreamsLegacyHeadersEnabled(string value, bool expected)
980+
{
981+
var source = CreateConfigurationSource((ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled, value));
982+
var settings = new TracerSettings(source);
983+
984+
settings.IsDataStreamsLegacyHeadersEnabled.Should().Be(expected);
985+
}
986+
977987
[Theory]
978988
[MemberData(nameof(BooleanTestCases), false)]
979989
public void IsRareSamplerEnabled(string value, bool expected)

0 commit comments

Comments
 (0)