Skip to content

Commit

Permalink
Zio layers (#195)
Browse files Browse the repository at this point in the history
* zio 1 (aka layer all-the-things) + error simplification

* fmt codd

* bumped log-effect to 0.14.1
  • Loading branch information
sirocchj authored Dec 2, 2020
1 parent 2116624 commit 0155375
Show file tree
Hide file tree
Showing 16 changed files with 263 additions and 265 deletions.
10 changes: 4 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ lazy val V = new {
val doobie = "0.9.4"
val kafka = "2.6.0"
val logback = "1.2.3"
val `log-effect` = "0.12.1"
val `log-effect` = "0.14.1"
val postgres = "42.2.18"
val refined = "0.9.19"
val scalacheck = "1.15.1"
val scalatest = "3.2.3"
val silencer = "1.7.1"
val zio = "1.0.0-RC17"
val `zio-interop` = "2.0.0.0-RC10"
val `zio-kafka` = "0.5.0"
val `zio-macros` = "0.6.2"
val zio = "1.0.3"
val `zio-interop` = "2.2.0.1"
val `zio-kafka` = "0.13.0"
}

lazy val D = new {
Expand Down Expand Up @@ -79,7 +78,6 @@ lazy val D = new {
val zio = Seq(
"dev.zio" %% "zio-interop-cats" % V.`zio-interop`,
"dev.zio" %% "zio-kafka" % V.`zio-kafka`,
"dev.zio" %% "zio-macros-core" % V.`zio-macros`,
"dev.zio" %% "zio-streams" % V.zio
)
}
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/tamer/Serdes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import java.nio.ByteBuffer

import com.sksamuel.avro4s._
import org.apache.avro.Schema
import tamer.registry._
import tamer.registry.{Registry, Topic}
import zio.{RIO, Task}
import zio.kafka.client.serde.{Deserializer, Serializer}
import zio.kafka.serde.{Deserializer, Serializer}

sealed trait Serde[A] extends Any {
def isKey: Boolean
Expand All @@ -28,12 +28,11 @@ object Serde {
private[this] def subject(topic: String): String = s"$topic-${if (isKey) "key" else "value"}"
override final val deserializer: Deserializer[Registry with Topic, A] = Deserializer.byteArray.mapM { ba =>
val buffer = ByteBuffer.wrap(ba)
if (buffer.get() != Magic) RIO.fail(SerializationError("Unknown magic byte!"))
if (buffer.get() != Magic) RIO.fail(TamerError("Deserialization failed: unknown magic byte!"))
else {
val id = buffer.getInt()
for {
env <- RIO.environment[Registry]
_ <- env.registry.verifySchema(id, schema)
_ <- registry.verifySchema(id, schema)
res <- RIO.fromTry {
val length = buffer.limit() - 1 - intByteSize
val payload = new Array[Byte](length)
Expand All @@ -45,8 +44,8 @@ object Serde {
}
override final val serializer: Serializer[Registry with Topic, A] = Serializer.byteArray.contramapM { a =>
for {
env <- RIO.environment[Registry with Topic]
id <- env.registry.getOrRegisterId(subject(env.topic), schema)
t <- registry.topic
id <- registry.getOrRegisterId(subject(t), schema)
arr <- Task {
val baos = new ByteArrayOutputStream
baos.write(Magic.toInt)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/tamer/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package tamer

import com.sksamuel.avro4s._
import doobie.util.query.Query0
import tamer.registry.Registry
import tamer.registry.{Registry, Topic}
import zio.UIO
import zio.kafka.client.serde.Serializer
import zio.kafka.serde.Serializer

final case class ResultMetadata(queryExecutionTime: Long)
final case class QueryResult[V](metadata: ResultMetadata, results: List[V])
Expand Down
34 changes: 17 additions & 17 deletions core/src/main/scala/tamer/TamerApp.scala
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
package tamer

import tamer.config.Config
import tamer.db.Db
import tamer.config._
import tamer.db.{Db, DbTransactor}
import tamer.kafka.Kafka
import zio._
import zio.{App, ExitCode, IO, Layer, URIO, ZEnv}
import zio.blocking.Blocking
import zio.clock.Clock
import zio.console._

abstract class TamerApp[K, V, State](private val setup: IO[SetupError, Setup[K, V, State]]) extends App {
final val run: ZIO[Blocking with Clock with Config with Db with Kafka, TamerError, Unit] =
for {
setup <- setup
config <- Config.Live.config.load
blockingEC <- blocking.blockingExecutor.map(_.asEC)
program <- Db.mkTransactor(config.db, platform.executor.asEC, blockingEC).use { tnx =>
Kafka.>.run(config.kafka, setup)(Db.>.runQuery(tnx, setup, config.query))
}
} yield program
abstract class TamerApp[K, V, State](private val setup: IO[TamerError, Setup[K, V, State]]) extends App {
final val run = for {
setup <- setup
program <- kafka.runLoop(setup)(db.runQuery(setup))
} yield program

override final def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
override final def run(args: List[String]): URIO[ZEnv, ExitCode] = {
val transactorLayer: Layer[TamerError, DbTransactor] = (Blocking.live ++ Config.live) >>> Db.hikariLayer
val dbLayer: Layer[TamerError, Db] = (Config.live ++ transactorLayer) >>> Db.live
val kafkaLayer: Layer[TamerError, Kafka] = Config.live >>> Kafka.live
run
.provide(new Blocking.Live with Clock.Live with Config.Live with Db.Live with Kafka.Live {})
.provideLayer(Blocking.live ++ Clock.live ++ dbLayer ++ kafkaLayer)
.foldM(
err => console.putStrLn(s"Execution failed with: $err") *> IO.succeed(1),
_ => IO.succeed(0)
err => putStrLn(s"Execution failed with: $err") *> IO.succeed(ExitCode.failure),
_ => IO.succeed(ExitCode.success)
)
}
}
3 changes: 0 additions & 3 deletions core/src/main/scala/tamer/Topic.scala

This file was deleted.

101 changes: 48 additions & 53 deletions core/src/main/scala/tamer/config/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,68 @@ package tamer
package config

import cats.implicits._
import ciris._
import ciris.refined._
import ciris.{ConfigError => CirisConfigError, _}
import eu.timepit.refined.auto._
import eu.timepit.refined.types.numeric.PosInt
import eu.timepit.refined.types.string.NonEmptyString
import zio._
import zio.interop.catz._
import zio.{IO, Task, ZIO}

import scala.concurrent.duration.FiniteDuration

final case class DbConfig(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password)
final case class QueryConfig(fetchChunkSize: PosInt)
final case class KafkaSinkConfig(topic: NonEmptyString)
final case class KafkaStateConfig(topic: NonEmptyString, groupId: NonEmptyString, clientId: NonEmptyString)
final case class KafkaConfig(
brokers: HostList,
schemaRegistryUrl: UrlString,
closeTimeout: FiniteDuration,
bufferSize: PosInt,
sink: KafkaSinkConfig,
state: KafkaStateConfig
)
final case class TamerConfig(db: DbConfig, query: QueryConfig, kafka: KafkaConfig)

trait Config extends Serializable {
val config: Config.Service[Any]
}

object Config {
trait Service[R] {
val load: ZIO[R, ConfigError, TamerConfig]
}
final case class Db(driver: NonEmptyString, uri: UriString, username: NonEmptyString, password: Password)
final case class Query(fetchChunkSize: PosInt)
final case class KafkaSink(topic: NonEmptyString)
final case class KafkaState(topic: NonEmptyString, groupId: NonEmptyString, clientId: NonEmptyString)
final case class Kafka(
brokers: HostList,
schemaRegistryUrl: UrlString,
closeTimeout: FiniteDuration,
bufferSize: PosInt,
sink: KafkaSink,
state: KafkaState
)
final case class Tamer(db: Db, query: Query, kafka: Kafka)

trait Live extends Config {
private[this] implicit final val hostListConfigDecoder: ConfigDecoder[String, HostList] =
ConfigDecoder.identity[String].map(_.split(",").toList.map(_.trim)).mapEither(ConfigDecoder[List[String], HostList].decode)
private[this] implicit final val hostListConfigDecoder: ConfigDecoder[String, HostList] =
ConfigDecoder.identity[String].map(_.split(",").toList.map(_.trim)).mapEither(ConfigDecoder[List[String], HostList].decode)

override final val config: Service[Any] = new Service[Any] {
private val dbConfig = (
env("DATABASE_DRIVER").as[NonEmptyString],
env("DATABASE_URL").as[UriString],
env("DATABASE_USERNAME").as[NonEmptyString],
env("DATABASE_PASSWORD").as[Password].redacted
).parMapN(DbConfig)
private[this] val dbConfigValue = (
env("DATABASE_DRIVER").as[NonEmptyString],
env("DATABASE_URL").as[UriString],
env("DATABASE_USERNAME").as[NonEmptyString],
env("DATABASE_PASSWORD").as[Password].redacted
).parMapN(Db)

private val queryConfig = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(QueryConfig)
private[this] val queryConfigValue = env("QUERY_FETCH_CHUNK_SIZE").as[PosInt].map(Query)

private val kafkaSinkConfig = env("KAFKA_SINK_TOPIC").as[NonEmptyString].map(KafkaSinkConfig)
private val kafkaStateConfig = (
env("KAFKA_STATE_TOPIC").as[NonEmptyString],
env("KAFKA_STATE_GROUP_ID").as[NonEmptyString],
env("KAFKA_STATE_CLIENT_ID").as[NonEmptyString]
).parMapN(KafkaStateConfig)
private val kafkaConfig = (
env("KAFKA_BROKERS").as[HostList],
env("KAFKA_SCHEMA_REGISTRY_URL").as[UrlString],
env("KAFKA_CLOSE_TIMEOUT").as[FiniteDuration],
env("KAFKA_BUFFER_SIZE").as[PosInt],
kafkaSinkConfig,
kafkaStateConfig
).parMapN(KafkaConfig)
private[this] val kafkaSinkConfigValue = env("KAFKA_SINK_TOPIC").as[NonEmptyString].map(KafkaSink)
private[this] val kafkaStateConfigValue = (
env("KAFKA_STATE_TOPIC").as[NonEmptyString],
env("KAFKA_STATE_GROUP_ID").as[NonEmptyString],
env("KAFKA_STATE_CLIENT_ID").as[NonEmptyString]
).parMapN(KafkaState)
private[this] val kafkaConfigValue = (
env("KAFKA_BROKERS").as[HostList],
env("KAFKA_SCHEMA_REGISTRY_URL").as[UrlString],
env("KAFKA_CLOSE_TIMEOUT").as[FiniteDuration],
env("KAFKA_BUFFER_SIZE").as[PosInt],
kafkaSinkConfigValue,
kafkaStateConfigValue
).parMapN(Kafka)
private[this] val tamerConfigValue: ConfigValue[Tamer] = (dbConfigValue, queryConfigValue, kafkaConfigValue).parMapN(Tamer.apply)

val tamerConfig: ConfigValue[TamerConfig] = (dbConfig, queryConfig, kafkaConfig).parMapN((db, query, kafka) => TamerConfig(db, query, kafka))
trait Service {
val dbConfig: URIO[DbConfig, Db]
val queryConfig: URIO[QueryConfig, Query]
val kafkaConfig: URIO[KafkaConfig, Kafka]
}

override final val load: IO[ConfigError, TamerConfig] =
tamerConfig.load[Task].refineToOrDie[CirisConfigError].mapError(ce => ConfigError(ce.redacted.show))
val live: Layer[TamerError, TamerConfig] = ZLayer.fromEffectMany {
tamerConfigValue.load[Task].refineToOrDie[ConfigException].mapError(ce => TamerError(ce.error.redacted.show, ce)).map {
case Tamer(db, query, kafka) => Has(db) ++ Has(query) ++ Has(kafka)
}
}

object Live extends Live
}
10 changes: 10 additions & 0 deletions core/src/main/scala/tamer/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@ import eu.timepit.refined.api.Refined
import eu.timepit.refined.boolean.{And, Or}
import eu.timepit.refined.collection.{Forall, NonEmpty}
import eu.timepit.refined.string.{IPv4, Uri, Url}
import zio.{Has, URIO, ZIO}

package object config {
type HostList = List[String] Refined (NonEmpty And Forall[IPv4 Or Uri])
type Password = String
type UriString = String Refined Uri
type UrlString = String Refined Url

type DbConfig = Has[Config.Db]
type QueryConfig = Has[Config.Query]
type KafkaConfig = Has[Config.Kafka]
type TamerConfig = DbConfig with QueryConfig with KafkaConfig

val dbConfig: URIO[DbConfig, Config.Db] = ZIO.access(_.get)
val queryConfig: URIO[QueryConfig, Config.Query] = ZIO.access(_.get)
val kafkaConfig: URIO[KafkaConfig, Config.Kafka] = ZIO.access(_.get)
}
71 changes: 28 additions & 43 deletions core/src/main/scala/tamer/db/Db.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,14 @@ import eu.timepit.refined.auto._
import fs2.{Chunk, Stream}
import log.effect.LogWriter
import log.effect.zio.ZioLogWriter.log4sFromName
import tamer.config.{DbConfig, QueryConfig}
import tamer.config._
import zio._
import zio.blocking.Blocking
import zio.interop.catz._
import tamer.db.Compat.toIterable

import scala.concurrent.ExecutionContext

trait Db extends Serializable {
val db: Db.Service[Any]
}

object Db {
implicit class InstantOps(ours: Instant) {
def -(theirs: Instant): Long = ours.toEpochMilli - theirs.toEpochMilli
Expand All @@ -31,43 +28,28 @@ object Db {
case class ChunkWithMetadata[V](chunk: Chunk[V], pulledAt: Instant = Instant.now())
case class ValueWithMetadata[V](value: V, pulledAt: Instant = Instant.now())

trait Service[R] {
def runQuery[K, V, State](
tnx: Transactor[Task],
setup: Setup[K, V, State],
queryConfig: QueryConfig
)(state: State, q: Queue[(K, V)]): ZIO[R, DbError, State]
}

object > extends Service[Db] {
override final def runQuery[K, V, State](
tnx: Transactor[Task],
setup: Setup[K, V, State],
queryConfig: QueryConfig
)(state: State, q: Queue[(K, V)]): ZIO[Db, DbError, State] = ZIO.accessM(_.db.runQuery(tnx, setup, queryConfig)(state, q))
trait Service {
def runQuery[K, V, State](setup: Setup[K, V, State])(state: State, q: Queue[(K, V)]): IO[TamerError, State]
}

trait Live extends Db {
override final val db: Service[Any] = new Service[Any] {
private[this] val logTask: Task[LogWriter[Task]] = log4sFromName.provide("tamer.Db.Live")
override final def runQuery[K, V, State](
tnx: Transactor[Task],
setup: Setup[K, V, State],
queryConfig: QueryConfig
)(state: State, q: Queue[(K, V)]): IO[DbError, State] =
// https://github.com/zio/zio/issues/2949
val live: URLayer[DbTransactor with QueryConfig, Db] = ZLayer.fromServices[Transactor[Task], Config.Query, Service] { (tnx, cfg) =>
new Service {
private[this] val logTask: Task[LogWriter[Task]] = log4sFromName.provide("tamer.db")
override final def runQuery[K, V, State](setup: Setup[K, V, State])(state: State, q: Queue[(K, V)]): IO[TamerError, State] =
(for {
log <- logTask
query <- UIO(setup.buildQuery(state))
_ <- log.debug(s"running ${query.sql} with params derived from $state").ignore
start <- UIO(Instant.now())
values <-
query
.streamWithChunkSize(queryConfig.fetchChunkSize)
.streamWithChunkSize(cfg.fetchChunkSize)
.chunks
.transact(tnx)
.map(c => ChunkWithMetadata(c))
.map(ChunkWithMetadata(_))
.evalTap(c => q.offerAll(toIterable(c.chunk).map(v => setup.valueToKey(v) -> v)))
.flatMap(c => Stream.chunk(c.chunk).map(v => ValueWithMetadata(v, c.pulledAt)))
.flatMap(c => Stream.chunk(c.chunk).map(ValueWithMetadata(_, c.pulledAt)))
.compile
.toList
newState <- setup.stateFoldM(state)(
Expand All @@ -76,20 +58,23 @@ object Db {
values.map(_.value)
)
)
} yield newState).mapError(e => DbError(e.getLocalizedMessage))
} yield newState).mapError(e => TamerError(e.getLocalizedMessage, e))
}
}

def mkTransactor(db: DbConfig, connectEC: ExecutionContext, transactEC: ExecutionContext): Managed[DbError, HikariTransactor[Task]] =
Managed {
HikariTransactor
.newHikariTransactor[Task](db.driver, db.uri, db.username, db.password, connectEC, Blocker.liftExecutionContext(transactEC))
.allocated
.map { case (ht, cleanup) =>
Reservation(ZIO.succeed(ht), _ => cleanup.orDie)
}
.uninterruptible
.refineToOrDie[SQLException]
.mapError(sqle => DbError(sqle.getLocalizedMessage()))
}
val hikariLayer: ZLayer[Blocking with DbConfig, TamerError, DbTransactor] = ZLayer.fromManaged {
for {
cfg <- dbConfig.toManaged_
connectEC <- ZIO.descriptor.map(_.executor.asEC).toManaged_
blockingEC <- blocking.blocking(ZIO.descriptor.map(_.executor.asEC)).toManaged_
managedTransactor <- mkTransactor(cfg, connectEC, blockingEC)
} yield managedTransactor
}

def mkTransactor(db: Config.Db, connectEC: ExecutionContext, transactEC: ExecutionContext): Managed[TamerError, HikariTransactor[Task]] =
HikariTransactor
.newHikariTransactor[Task](db.driver, db.uri, db.username, db.password, connectEC, Blocker.liftExecutionContext(transactEC))
.toManagedZIO
.refineToOrDie[SQLException]
.mapError(sqle => TamerError(sqle.getLocalizedMessage, sqle))
}
12 changes: 12 additions & 0 deletions core/src/main/scala/tamer/db/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package tamer

import doobie.util.transactor.Transactor
import zio.{Has, Queue, Task, ZIO}

package object db {
type Db = Has[Db.Service]
type DbTransactor = Has[Transactor[Task]]

def runQuery[K, V, State](setup: Setup[K, V, State])(state: State, q: Queue[(K, V)]): ZIO[Db, TamerError, State] =
ZIO.accessM(_.get.runQuery(setup)(state, q))
}
Loading

0 comments on commit 0155375

Please sign in to comment.