Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add network support to the Kafka container #1316

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
<PackageVersion Include="Azure.Storage.Blobs" Version="12.17.0"/>
<PackageVersion Include="Azure.Storage.Queues" Version="12.15.0"/>
<PackageVersion Include="ClickHouse.Client" Version="7.9.1"/>
<PackageVersion Include="Confluent.Kafka" Version="2.0.2"/>
<PackageVersion Include="Confluent.Kafka" Version="2.8.0"/>
<PackageVersion Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.8.0"/>
<PackageVersion Include="Confluent.SchemaRegistry" Version="2.8.0"/>
<PackageVersion Include="Consul" Version="1.6.10.9"/>
<PackageVersion Include="CouchbaseNetClient" Version="3.6.4"/>
<PackageVersion Include="DotPulsar" Version="3.3.2"/>
Expand Down
56 changes: 53 additions & 3 deletions src/Testcontainers.Kafka/KafkaBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer

public const ushort BrokerPort = 9093;

public const ushort ControllerPort = 9094;

public const ushort ZookeeperPort = 2181;

public const string StartupScriptFilePath = "/testcontainers.sh";

private const string ProtocolPrefix = "TC";

/// <summary>
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
/// </summary>
Expand Down Expand Up @@ -43,6 +47,49 @@ public override KafkaContainer Build()
return new KafkaContainer(DockerResourceConfiguration);
}

/// <summary>
/// Adds a listener to the Kafka configuration in the format <c>host:port</c>.
/// </summary>
/// <remarks>
/// The host will be included as a network alias, allowing additional connections
/// to the Kafka broker within the same container network.
///
/// This method is useful for registering custom listeners beyond the default ones,
/// enabling specific connection points for Kafka brokers.
///
/// Default listeners include:
/// - <c>PLAINTEXT://0.0.0.0:9092</c>
/// - <c>BROKER://0.0.0.0:9093</c>
/// - <c>CONTROLLER://0.0.0.0:9094</c>
/// </remarks>
/// <param name="kafka">The MsSql database.</param>
/// <returns>A configured instance of <see cref="KafkaBuilder" />.</returns>
public KafkaBuilder WithListener(string kafka)
{
var index = DockerResourceConfiguration.Listeners?.Count() ?? 0;
var protocol = $"{ProtocolPrefix}-{index}";
var listener = $"{protocol}://{kafka}";
var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";

var listeners = new[] { listener };
var listenersSecurityProtocolMap = new[] { listenerSecurityProtocolMap };

var host = kafka.Split(':')[0];

var updatedListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
.Split(',')
.Concat(listeners);

var updatedListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
.Split(',')
.Concat(listenersSecurityProtocolMap);

return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners))
.WithEnvironment("KAFKA_LISTENERS", string.Join(",", updatedListeners))
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", updatedListenersSecurityProtocolMap))
.WithNetworkAliases(host);
}

/// <inheritdoc />
protected override KafkaBuilder Init()
{
Expand All @@ -51,10 +98,12 @@ protected override KafkaBuilder Init()
.WithPortBinding(KafkaPort, true)
.WithPortBinding(BrokerPort, true)
.WithPortBinding(ZookeeperPort, true)
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KafkaPort + ",BROKER://0.0.0.0:" + BrokerPort)
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_NODE_ID", "1")
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort)
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
Expand All @@ -68,6 +117,7 @@ protected override KafkaBuilder Init()
.WithStartupCallback((container, ct) =>
{
const char lf = '\n';
var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>());
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
startupScript.Append(lf);
Expand All @@ -79,7 +129,7 @@ protected override KafkaBuilder Init()
startupScript.Append(lf);
startupScript.Append("zookeeper-server-start zookeeper.properties &");
startupScript.Append(lf);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + "," + additionalAdvertisedListeners);
startupScript.Append(lf);
startupScript.Append("echo '' > /etc/confluent/docker/ensure");
startupScript.Append(lf);
Expand Down
20 changes: 19 additions & 1 deletion src/Testcontainers.Kafka/KafkaConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ public sealed class KafkaConfiguration : ContainerConfiguration
/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
public KafkaConfiguration()
/// <param name="listeners">A list of listeners.</param>
/// <param name="advertisedListeners">A list of advertised listeners.</param>
public KafkaConfiguration(
IEnumerable<string> listeners = null,
IEnumerable<string> advertisedListeners = null)
{
Listeners = listeners;
AdvertisedListeners = advertisedListeners;
}

/// <summary>
Expand Down Expand Up @@ -49,5 +55,17 @@ public KafkaConfiguration(KafkaConfiguration resourceConfiguration)
public KafkaConfiguration(KafkaConfiguration oldValue, KafkaConfiguration newValue)
: base(oldValue, newValue)
{
Listeners = BuildConfiguration.Combine(oldValue.Listeners, newValue.Listeners);
AdvertisedListeners = BuildConfiguration.Combine(oldValue.AdvertisedListeners, newValue.AdvertisedListeners);
}

/// <summary>
/// Gets a list of listeners.
/// </summary>
public IEnumerable<string> Listeners { get; }

/// <summary>
/// Gets a list of advertised listeners.
/// </summary>
public IEnumerable<string> AdvertisedListeners { get; }
}
14 changes: 14 additions & 0 deletions src/Testcontainers.Kafka/KafkaContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ namespace Testcontainers.Kafka;
[PublicAPI]
public sealed class KafkaContainer : DockerContainer
{
private readonly KafkaConfiguration _configuration;

/// <summary>
/// Initializes a new instance of the <see cref="KafkaContainer" /> class.
/// </summary>
/// <param name="configuration">The container configuration.</param>
public KafkaContainer(KafkaConfiguration configuration)
: base(configuration)
{
_configuration = configuration;
}

/// <summary>
Expand All @@ -21,4 +24,15 @@ public string GetBootstrapAddress()
{
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString();
}

/// <summary>
/// Gets a list of advertised listeners.
/// </summary>
public IEnumerable<string> AdvertisedListeners
{
get
{
return _configuration.AdvertisedListeners;
}
}
}
2 changes: 2 additions & 0 deletions src/Testcontainers.Kafka/Usings.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
global using System;
global using System.Collections.Generic;
global using System.Linq;
global using System.Text;
global using Docker.DotNet.Models;
global using DotNet.Testcontainers.Builders;
Expand Down
68 changes: 68 additions & 0 deletions tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
namespace Testcontainers.Kafka;

public sealed class KafkaContainerNetworkTest : IAsyncLifetime
{
private const string Message = "Message produced by kafkacat";

private const string Listener = "kafka:19092";

private const string DataFilePath = "/data/msgs.txt";

private readonly INetwork _network;

private readonly IContainer _kafkaContainer;

private readonly IContainer _kCatContainer;

public KafkaContainerNetworkTest()
{
_network = new NetworkBuilder()
.Build();

_kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka:6.1.9")
.WithNetwork(_network)
.WithListener(Listener)
.Build();

_kCatContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-kafkacat:6.1.9")
.WithNetwork(_network)
.WithEntrypoint(CommonCommands.SleepInfinity)
.WithResourceMapping(Encoding.Default.GetBytes(Message), DataFilePath)
.Build();
}

public async Task InitializeAsync()
{
await _kafkaContainer.StartAsync()
.ConfigureAwait(false);

await _kCatContainer.StartAsync()
.ConfigureAwait(false);
}

public async Task DisposeAsync()
{
await _kafkaContainer.StartAsync()
.ConfigureAwait(false);

await _kCatContainer.StartAsync()
.ConfigureAwait(false);

await _network.DisposeAsync()
.ConfigureAwait(false);
}

[Fact]
public async Task ConsumesProducedKafkaMessage()
{
_ = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-t", "msgs", "-P", "-l", DataFilePath })
.ConfigureAwait(true);

var execResult = await _kCatContainer.ExecAsync(new[] { "kafkacat", "-b", Listener, "-C", "-t", "msgs", "-c", "1" })
.ConfigureAwait(true);

Assert.Equal(Message, execResult.Stdout.Trim());
}
}
129 changes: 129 additions & 0 deletions tests/Testcontainers.Kafka.Tests/KafkaContainerRegistryTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
namespace Testcontainers.Kafka;

public sealed class KafkaContainerRegistryTest : IAsyncLifetime
{
private const string Schema = @"
{
""$schema"": ""http://json-schema.org/draft-04/schema#"",
""title"": ""User"",
""type"": ""object"",
""additionalProperties"": false,
""properties"": {
""FirstName"": {
""type"": [""null"", ""string""]
},
""LastName"": {
""type"": [""null"", ""string""]
}
}
}";

private const ushort RestPort = 8085;

private const string SchemaRegistryNetworkAlias = "schema-registry";

private const string Listener = "kafka:19092";

private readonly INetwork _network;

private readonly KafkaContainer _kafkaContainer;

private readonly IContainer _schemaRegistryContainer;

public KafkaContainerRegistryTest()
{
_network = new NetworkBuilder()
.Build();

_kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka:6.1.9")
.WithNetwork(_network)
.WithListener(Listener)
.Build();

_schemaRegistryContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-schema-registry:6.1.9")
.WithPortBinding(RestPort, true)
.WithNetwork(_network)
.WithNetworkAliases(SchemaRegistryNetworkAlias)
.WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:" + RestPort)
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + Listener)
.WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", SchemaRegistryNetworkAlias)
.WithWaitStrategy(Wait.ForUnixContainer().UntilHttpRequestIsSucceeded(request =>
request.ForPort(RestPort).ForPath("/subjects")))
.Build();
}

public async Task InitializeAsync()
{
await _kafkaContainer.StartAsync()
.ConfigureAwait(false);

await _schemaRegistryContainer.StartAsync()
.ConfigureAwait(false);
}

public async Task DisposeAsync()
{
await _kafkaContainer.StartAsync()
.ConfigureAwait(false);

await _schemaRegistryContainer.StartAsync()
.ConfigureAwait(false);

await _network.DisposeAsync()
.ConfigureAwait(false);
}

[Fact]
public async Task ConsumerReturnsProducerMessage()
{
// Given
const string topic = "user";

var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topic);

var bootstrapServer = _kafkaContainer.GetBootstrapAddress();

var producerConfig = new ProducerConfig();
producerConfig.BootstrapServers = bootstrapServer;

var consumerConfig = new ConsumerConfig();
consumerConfig.BootstrapServers = bootstrapServer;
consumerConfig.GroupId = "sample-consumer";
consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;

var message = new Message<string, User>();
message.Value = new User("John", "Doe");

var schemaRegistryConfig = new SchemaRegistryConfig();
schemaRegistryConfig.Url = new UriBuilder(Uri.UriSchemeHttp, _schemaRegistryContainer.Hostname, _schemaRegistryContainer.GetMappedPublicPort(RestPort)).ToString();

// When
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
_ = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(Schema, SchemaType.Json))
.ConfigureAwait(true);

using var producer = new ProducerBuilder<string, User>(producerConfig)
.SetValueSerializer(new JsonSerializer<User>(schemaRegistry))
.Build();

_ = await producer.ProduceAsync(topic, message)
.ConfigureAwait(true);

using var consumer = new ConsumerBuilder<string, User>(consumerConfig)
.SetValueDeserializer(new JsonDeserializer<User>().AsSyncOverAsync())
.Build();

consumer.Subscribe(topic);

var result = consumer.Consume(TimeSpan.FromSeconds(15));

// Then
Assert.NotNull(result);
Assert.Equal(message.Value, result.Message.Value);
}

private record User(string FirstName, string LastName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
<PackageReference Include="xunit.runner.visualstudio"/>
<PackageReference Include="xunit"/>
<PackageReference Include="Confluent.Kafka"/>
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Json"/>
<PackageReference Include="Confluent.SchemaRegistry"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../../src/Testcontainers.Kafka/Testcontainers.Kafka.csproj"/>
Expand Down
Loading