Skip to content

Commit 3414683

Browse files
authored
Create topics when bootstrapping Tamer (#1385)
* Create topics when bootstrapping Tamer * more checks
1 parent 0d2d8b2 commit 3414683

17 files changed

+367
-240
lines changed

core/src/main/scala/tamer/Registry.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,11 @@ object RegistryProvider {
101101
implicit final val defaultRegistryProvider: RegistryProvider = RegistryProvider { config =>
102102
val sttpBackend = sttp.client4.httpclient.zio.HttpClientZioBackend.scoped()
103103
val schemaToIdCache = Cache.makeWithKey(config.cacheSize, Lookup((Registry.SttpRegistry.getOrRegisterId _).tupled))(
104-
_ => 1.hour,
104+
_ => config.expiration,
105105
{ case (_, _, _, subject, schema) => (subject, schema) }
106106
)
107107
val schemaIdToValidationCache = Cache.makeWithKey(config.cacheSize, Lookup((Registry.SttpRegistry.verifySchema _).tupled))(
108-
_ => 1.hour,
108+
_ => config.expiration,
109109
{ case (_, _, _, id, schema) => (id, schema) }
110110
)
111111
val log = log4sFromName.provideEnvironment(ZEnvironment("tamer.SttpRegistry"))

core/src/main/scala/tamer/Tamer.scala

+191-85
Large diffs are not rendered by default.

core/src/main/scala/tamer/config.scala

+41-21
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,53 @@
11
package tamer
22

3+
import java.time.{Duration => JDuration}
4+
35
import zio._
46

5-
final case class RegistryConfig(url: String, cacheSize: Int)
7+
final case class RegistryConfig(url: String, cacheSize: Int, expiration: JDuration)
68
object RegistryConfig {
79
def apply(url: String): RegistryConfig = RegistryConfig(
810
url = url,
9-
cacheSize = 1000
11+
cacheSize = 4,
12+
expiration = 1.hour
1013
)
1114
val config: Config[Option[RegistryConfig]] =
12-
(Config.string("url") ++ Config.int("cache_size").withDefault(1000)).map { case (url, cacheSize) =>
13-
RegistryConfig(url, cacheSize)
15+
(Config.string("url") ++ Config.int("cache_size").withDefault(4) ++ Config.duration("expiration").withDefault(1.hour)).map {
16+
case (url, cacheSize, expiration) => RegistryConfig(url, cacheSize, expiration)
1417
}.optional
1518
}
1619

17-
final case class SinkConfig(topic: String)
18-
object SinkConfig {
19-
val config: Config[SinkConfig] = Config.string("topic").map(SinkConfig.apply)
20+
final case class TopicOptions(partitions: Int, replicas: Short)
21+
object TopicOptions {
22+
val config: Config[Option[TopicOptions]] =
23+
(Config.boolean("auto_create").withDefault(false) ++
24+
Config.int("partitions").withDefault(1) ++
25+
Config.int("replicas").map(_.toShort).withDefault(1.toShort)).map {
26+
case (true, partitions, replicas) => Some(TopicOptions(partitions, replicas))
27+
case _ => None
28+
}
2029
}
2130

22-
final case class StateConfig(topic: String, groupId: String, clientId: String)
23-
object StateConfig {
24-
val config: Config[StateConfig] =
25-
(Config.string("topic") ++ Config.string("group_id") ++ Config.string("client_id")).map { case (stateTopic, stateGroupId, stateClientId) =>
26-
StateConfig(stateTopic, stateGroupId, stateClientId)
27-
}
31+
final case class TopicConfig(topicName: String, maybeTopicOptions: Option[TopicOptions])
32+
object TopicConfig {
33+
def apply(topicName: String): TopicConfig = new TopicConfig(
34+
topicName = topicName,
35+
maybeTopicOptions = None
36+
)
37+
val config: Config[TopicConfig] = (Config.string("topic") ++ TopicOptions.config).map { case (topicName, maybeTopicOptions) =>
38+
TopicConfig(topicName, maybeTopicOptions)
39+
}
2840
}
2941

3042
final case class KafkaConfig(
3143
brokers: List[String],
3244
maybeRegistry: Option[RegistryConfig],
3345
closeTimeout: Duration,
3446
bufferSize: Int,
35-
sink: SinkConfig,
36-
state: StateConfig,
47+
sink: TopicConfig,
48+
state: TopicConfig,
49+
groupId: String,
50+
clientId: String,
3751
transactionalId: String,
3852
properties: Map[String, AnyRef]
3953
)
@@ -43,8 +57,10 @@ object KafkaConfig {
4357
maybeRegistry: Option[RegistryConfig],
4458
closeTimeout: Duration,
4559
bufferSize: Int,
46-
sink: SinkConfig,
47-
state: StateConfig,
60+
sink: TopicConfig,
61+
state: TopicConfig,
62+
groupId: String,
63+
clientId: String,
4864
transactionalId: String
4965
): KafkaConfig = new KafkaConfig(
5066
brokers = brokers,
@@ -53,6 +69,8 @@ object KafkaConfig {
5369
bufferSize = bufferSize,
5470
sink = sink,
5571
state = state,
72+
groupId = groupId,
73+
clientId = clientId,
5674
transactionalId = transactionalId,
5775
properties = Map.empty
5876
)
@@ -62,11 +80,13 @@ object KafkaConfig {
6280
RegistryConfig.config.nested("schema_registry") ++
6381
Config.duration("close_timeout") ++
6482
Config.int("buffer_size") ++
65-
SinkConfig.config.nested("sink") ++
66-
StateConfig.config.nested("state") ++
83+
TopicConfig.config.nested("sink") ++
84+
TopicConfig.config.nested("state") ++
85+
Config.string("group_id") ++
86+
Config.string("client_id") ++
6787
Config.string("transactional_id")
68-
).map { case (brokers, maybeRegistry, closeTimeout, bufferSize, sink, state, transactionalId) =>
69-
KafkaConfig(brokers, maybeRegistry, closeTimeout, bufferSize, sink, state, transactionalId)
88+
).map { case (brokers, maybeRegistry, closeTimeout, bufferSize, sink, state, groupId, clientId, transactionalId) =>
89+
KafkaConfig(brokers, maybeRegistry, closeTimeout, bufferSize, sink, state, groupId, clientId, transactionalId)
7090
}.nested("kafka")
7191

7292
final val fromEnvironment: TaskLayer[KafkaConfig] = ZLayer {

core/src/test/scala/tamer/FakeKafka.scala

+8-5
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,18 @@ object FakeKafka {
3434
for {
3535
randomString <- Random.nextUUID.map(uuid => s"test-$uuid")
3636
fakeKafka <- ZIO.service[FakeKafka]
37-
_ <- fakeKafka.createTopic(s"sink.topic.tape.$randomString")
37+
_ <- fakeKafka.createTopic(s"sink.topic.$randomString")
38+
_ <- fakeKafka.createTopic(s"state.topic.$randomString")
3839
} yield KafkaConfig(
3940
brokers = fakeKafka.bootstrapServers,
40-
maybeRegistry = Some(RegistryConfig(fakeKafka.schemaRegistryUrl, 1000)),
41+
maybeRegistry = Some(RegistryConfig(fakeKafka.schemaRegistryUrl)),
4142
closeTimeout = 1.second,
4243
bufferSize = 5,
43-
sink = SinkConfig(s"sink.topic.$randomString"),
44-
state = StateConfig(s"sink.topic.tape.$randomString", s"embedded.groupid.$randomString", s"embedded.clientid.$randomString"),
45-
transactionalId = s"test-transactional-id-$randomString"
44+
sink = TopicConfig(s"sink.topic.$randomString"),
45+
state = TopicConfig(s"state.topic.$randomString"),
46+
groupId = s"groupid.$randomString",
47+
clientId = s"clientid.$randomString",
48+
transactionalId = s"transactionalid.$randomString"
4649
)
4750
}
4851

db/local/docker-compose.yml

+7-38
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ services:
2121

2222
zookeeper:
2323
container_name: local-zookeeper
24-
image: confluentinc/cp-zookeeper:6.1.1
24+
image: confluentinc/cp-zookeeper:latest
2525
ports:
2626
- 2181:2181
2727
hostname: zookeeper
@@ -32,7 +32,7 @@ services:
3232

3333
kafka:
3434
container_name: local-kafka
35-
image: confluentinc/cp-kafka:6.2.1
35+
image: confluentinc/cp-kafka:latest
3636
depends_on:
3737
- zookeeper
3838
ports:
@@ -43,47 +43,16 @@ services:
4343
networks:
4444
- local_kafka_network
4545
environment:
46+
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
47+
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
48+
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
4649
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
4750
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
4851
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
4952

50-
kafka2:
51-
container_name: local-kafka2
52-
image: confluentinc/cp-kafka:6.2.1
53-
depends_on:
54-
- zookeeper
55-
ports:
56-
- 9093:9093
57-
- 9104:9104
58-
- 29093:29093
59-
hostname: kafka2
60-
networks:
61-
- local_kafka_network
62-
environment:
63-
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
64-
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
65-
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093
66-
67-
kafka3:
68-
container_name: local-kafka3
69-
image: confluentinc/cp-kafka:6.2.1
70-
depends_on:
71-
- zookeeper
72-
ports:
73-
- 9095:9095
74-
- 9106:9106
75-
- 29095:29095
76-
hostname: kafka3
77-
networks:
78-
- local_kafka_network
79-
environment:
80-
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
81-
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
82-
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:29095,PLAINTEXT_HOST://localhost:9095
83-
8453
kafka-rest:
8554
container_name: local-kafka-rest
86-
image: confluentinc/cp-kafka-rest:6.2.1
55+
image: confluentinc/cp-kafka-rest:latest
8756
depends_on:
8857
- kafka
8958
- schema-registry
@@ -114,7 +83,7 @@ services:
11483

11584
schema-registry:
11685
container_name: local-schema-registry
117-
image: confluentinc/cp-schema-registry:6.2.1
86+
image: confluentinc/cp-schema-registry:latest
11887
depends_on:
11988
- kafka
12089
ports:

db/local/runDatabaseSimple.sh

+9-3
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@ export KAFKA_SCHEMA_REGISTRY_URL=http://localhost:8081
1313
export KAFKA_CLOSE_TIMEOUT=10s
1414
export KAFKA_BUFFER_SIZE=50
1515
export KAFKA_SINK_TOPIC=sink
16+
export KAFKA_SINK_AUTO_CREATE=on
17+
export KAFKA_SINK_PARTITIONS=1
18+
export KAFKA_SINK_REPLICAS=1
1619
export KAFKA_STATE_TOPIC=state
17-
export KAFKA_STATE_GROUP_ID=state-group
18-
export KAFKA_STATE_CLIENT_ID=state-client
19-
export KAFKA_TRANSACTIONAL_ID=transaction-id
20+
export KAFKA_STATE_AUTO_CREATE=on
21+
export KAFKA_STATE_PARTITIONS=1
22+
export KAFKA_STATE_REPLICAS=1
23+
export KAFKA_GROUP_ID=groupid
24+
export KAFKA_CLIENT_ID=clientid
25+
export KAFKA_TRANSACTIONAL_ID=transactionid
2026

2127
SCRIPT_PATH=$(cd "$(dirname "${BASH_SOURCE[0]}")" || exit; pwd -P)
2228
cd "$SCRIPT_PATH"/../.. || exit

example/src/main/scala/tamer/db/DatabaseGeneralized.scala

+13-14
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,17 @@ import zio._
88
object DatabaseGeneralized extends ZIOAppDefault {
99
import implicits._
1010

11-
override final val run =
12-
Clock.instant.flatMap { bootTime =>
13-
DbSetup(MyState(bootTime - 60.days, bootTime - 60.days + 5.minutes))(s =>
14-
sql"""SELECT id, name, description, modified_at FROM users WHERE modified_at > ${s.from} AND modified_at <= ${s.to}""".query[Row]
15-
)(
16-
recordKey = (_, v) => v.id,
17-
stateFold = {
18-
case (s, QueryResult(_, results)) if results.isEmpty => Clock.instant.map(now => MyState(s.from, (s.to + 5.minutes).or(now)))
19-
case (_, QueryResult(_, results)) =>
20-
val mostRecent = results.sortBy(_.modifiedAt).max.timestamp
21-
Clock.instant.map(now => MyState(mostRecent, (mostRecent + 5.minutes).or(now)))
22-
}
23-
).runWith(dbLayerFromEnvironment ++ KafkaConfig.fromEnvironment)
24-
}
11+
override final val run = Clock.instant.flatMap { bootTime =>
12+
DbSetup(MyState(bootTime - 60.days, bootTime - 60.days + 5.minutes))(s =>
13+
sql"""SELECT id, name, description, modified_at FROM users WHERE modified_at > ${s.from} AND modified_at <= ${s.to}""".query[Row]
14+
)(
15+
recordKey = (_, v) => v.id,
16+
stateFold = {
17+
case (s, QueryResult(_, results)) if results.isEmpty => Clock.instant.map(now => MyState(s.from, (s.to + 5.minutes).or(now)))
18+
case (_, QueryResult(_, results)) =>
19+
val mostRecent = results.sortBy(_.modifiedAt).max.timestamp
20+
Clock.instant.map(now => MyState(mostRecent, (mostRecent + 5.minutes).or(now)))
21+
}
22+
).runWith(dbLayerFromEnvironment ++ KafkaConfig.fromEnvironment)
23+
}
2524
}

example/src/main/scala/tamer/s3/S3Generalized.scala

+12-4
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,17 @@ object S3Generalized extends ZIOAppDefault {
3333
}
3434

3535
val myKafkaConfigLayer = ZLayer.succeed {
36-
val kafkaSink = SinkConfig("sink-topic")
37-
val kafkaState = StateConfig("state-topic", "groupid", "clientid")
38-
KafkaConfig(List("localhost:9092"), Some(RegistryConfig("http://localhost:8081")), 10.seconds, 50, kafkaSink, kafkaState, "s3-generalized-id")
36+
KafkaConfig(
37+
List("localhost:9092"),
38+
Some(RegistryConfig("http://localhost:8081")),
39+
10.seconds,
40+
50,
41+
TopicConfig("sink", Some(TopicOptions(1, 1))),
42+
TopicConfig("state", Some(TopicOptions(1, 1))),
43+
"groupid",
44+
"clientid",
45+
"s3-generalized-id"
46+
)
3947
}
4048

4149
override final val run = S3Setup(
@@ -48,5 +56,5 @@ object S3Generalized extends ZIOAppDefault {
4856
recordKey = (l: Long, _: String) => l,
4957
selectObjectForState = (l: Long, _: Keys) => internals.selectObjectForInstant(l),
5058
stateFold = internals.getNextState
51-
).runWith(Scope.default >>> (liveZIO(AF_SOUTH_1, s3.providers.default, Some(new URI("http://localhost:9000"))) ++ myKafkaConfigLayer))
59+
).runWith(liveZIO(AF_SOUTH_1, ZIO.scoped(s3.providers.default), Some(new URI("http://localhost:9000"))) ++ myKafkaConfigLayer)
5260
}

example/src/main/scala/tamer/s3/S3Simple.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ object S3Simple extends ZIOAppDefault {
1818
from = Instant.parse("2020-12-03T10:15:30.00Z"),
1919
dateTimeFormatter = ZonedDateTimeFormatter.fromPattern("yyyy-MM-dd HH:mm:ss", ZoneId.of("Europe/Rome"))
2020
)
21-
.runWith(Scope.default >>> (liveZIO(AF_SOUTH_1, s3.providers.default, Some(new URI("http://localhost:9000"))) ++ KafkaConfig.fromEnvironment))
21+
.runWith(liveZIO(AF_SOUTH_1, ZIO.scoped(s3.providers.default), Some(new URI("http://localhost:9000"))) ++ KafkaConfig.fromEnvironment)
2222
}

rest/local/docker-compose.yml

+7-38
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ services:
44

55
zookeeper:
66
container_name: local-zookeeper
7-
image: confluentinc/cp-zookeeper:6.1.1
7+
image: confluentinc/cp-zookeeper:latest
88
ports:
99
- 2181:2181
1010
hostname: zookeeper
@@ -15,7 +15,7 @@ services:
1515

1616
kafka:
1717
container_name: local-kafka
18-
image: confluentinc/cp-kafka:6.2.1
18+
image: confluentinc/cp-kafka:latest
1919
depends_on:
2020
- zookeeper
2121
ports:
@@ -26,47 +26,16 @@ services:
2626
networks:
2727
- local_kafka_network
2828
environment:
29+
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
30+
- KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1
31+
- KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1
2932
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
3033
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
3134
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
3235

33-
kafka2:
34-
container_name: local-kafka2
35-
image: confluentinc/cp-kafka:6.2.1
36-
depends_on:
37-
- zookeeper
38-
ports:
39-
- 9093:9093
40-
- 9104:9104
41-
- 29093:29093
42-
hostname: kafka2
43-
networks:
44-
- local_kafka_network
45-
environment:
46-
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
47-
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
48-
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:29093,PLAINTEXT_HOST://localhost:9093
49-
50-
kafka3:
51-
container_name: local-kafka3
52-
image: confluentinc/cp-kafka:6.2.1
53-
depends_on:
54-
- zookeeper
55-
ports:
56-
- 9095:9095
57-
- 9106:9106
58-
- 29095:29095
59-
hostname: kafka3
60-
networks:
61-
- local_kafka_network
62-
environment:
63-
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
64-
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
65-
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:29095,PLAINTEXT_HOST://localhost:9095
66-
6736
kafka-rest:
6837
container_name: local-kafka-rest
69-
image: confluentinc/cp-kafka-rest:6.2.1
38+
image: confluentinc/cp-kafka-rest:latest
7039
depends_on:
7140
- kafka
7241
- schema-registry
@@ -97,7 +66,7 @@ services:
9766

9867
schema-registry:
9968
container_name: local-schema-registry
100-
image: confluentinc/cp-schema-registry:6.2.1
69+
image: confluentinc/cp-schema-registry:latest
10170
depends_on:
10271
- kafka
10372
ports:

0 commit comments

Comments
 (0)