@@ -13,7 +13,7 @@ import zio.kafka.serde.{Serde => ZSerde, Serializer}
13
13
import zio .stream .{Stream , ZStream }
14
14
15
15
trait Tamer {
16
- def runLoop : IO [ TamerError , Unit ]
16
+ def runLoop : Task [ Unit ]
17
17
}
18
18
19
19
object Tamer {
@@ -24,11 +24,6 @@ object Tamer {
24
24
25
25
private [this ] final val tenTimes = Schedule .recurs(10L ) && Schedule .exponential(100 .milliseconds) // FIXME make configurable
26
26
27
- private [this ] final val tamerErrors : PartialFunction [Throwable , TamerError ] = {
28
- case ke : KafkaException => TamerError (ke.getLocalizedMessage, ke)
29
- case te : TamerError => te
30
- }
31
-
32
27
private [this ] implicit final class OffsetOps (private val _underlying : Offset ) extends AnyVal {
33
28
def info : String = s " ${_underlying.topicPartition}@ ${_underlying.offset}"
34
29
}
@@ -203,16 +198,12 @@ object Tamer {
203
198
runSink <&> runSource
204
199
}
205
200
206
- override val runLoop : IO [TamerError , Unit ] = {
207
- val logic = for {
208
- log <- logTask
209
- _ <- log.info(s " initializing Tamer with setup: \n $repr" )
210
- queue <- Queue .bounded[(TransactionInfo , Chunk [(K , V )])](config.bufferSize)
211
- _ <- runLoop(queue, log)
212
- } yield ()
213
-
214
- logic.refineOrDie(tamerErrors)
215
- }
201
+ override val runLoop : Task [Unit ] = for {
202
+ log <- logTask
203
+ _ <- log.info(s " initializing Tamer with setup: \n $repr" )
204
+ queue <- Queue .bounded[(TransactionInfo , Chunk [(K , V )])](config.bufferSize)
205
+ _ <- runLoop(queue, log)
206
+ } yield ()
216
207
}
217
208
218
209
object LiveTamer {
@@ -224,7 +215,7 @@ object Tamer {
224
215
stateKey : Int ,
225
216
iterationFunction : (SV , Enqueue [NonEmptyChunk [(K , V )]]) => Task [SV ],
226
217
repr : String
227
- ): ZIO [Scope , TamerError , LiveTamer [K , V , SV ]] = {
218
+ ): RIO [Scope , LiveTamer [K , V , SV ]] = {
228
219
229
220
val KafkaConfig (brokers, _, closeTimeout, _, _, StateConfig (_, groupId, clientId), transactionalId, properties) = config
230
221
@@ -251,9 +242,7 @@ object Tamer {
251
242
.mapError(TamerError (" Could not build Kafka client" , _))
252
243
}
253
244
254
- private [tamer] final def getLayer [R , K : Tag , V : Tag , SV : Tag ](
255
- setup : Setup [R , K , V , SV ]
256
- ): ZLayer [R with KafkaConfig , TamerError , Tamer ] =
245
+ private [tamer] final def getLayer [R , K : Tag , V : Tag , SV : Tag ](setup : Setup [R , K , V , SV ]): RLayer [R with KafkaConfig , Tamer ] =
257
246
ZLayer .scoped[R with KafkaConfig ] {
258
247
for {
259
248
config <- ZIO .service[KafkaConfig ]
@@ -265,7 +254,5 @@ object Tamer {
265
254
}
266
255
}
267
256
268
- final def live [R , K : Tag , V : Tag , SV : Tag ](
269
- setup : Setup [R , K , V , SV ]
270
- ): ZLayer [R with KafkaConfig , TamerError , Tamer ] = LiveTamer .getLayer(setup)
257
+ final def live [R , K : Tag , V : Tag , SV : Tag ](setup : Setup [R , K , V , SV ]): RLayer [R with KafkaConfig , Tamer ] = LiveTamer .getLayer(setup)
271
258
}
0 commit comments