Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scalacheck for aggregates and projections #22

Merged
merged 28 commits into from
Feb 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7645794
add scalacheck
svalaskevicius Nov 6, 2015
670eb8c
remove logging messages (noise)
svalaskevicius Nov 6, 2015
08cc397
add basic counter test
svalaskevicius Nov 6, 2015
0eaa7dd
also cover negative cases
svalaskevicius Nov 6, 2015
34dc2a9
refactor test
svalaskevicius Nov 15, 2015
9fdcee8
add close/open doors test
svalaskevicius Nov 16, 2015
a6adc53
add lock/unlock test cases
svalaskevicius Nov 16, 2015
61455b6
remove repetition
svalaskevicius Nov 16, 2015
56e2e78
refactor aggregate
svalaskevicius Nov 22, 2015
ab252d3
refactor flow aggregate
svalaskevicius Nov 22, 2015
4111b1e
wip: add book example
svalaskevicius Feb 21, 2016
d15ee38
improve tests
svalaskevicius Feb 28, 2016
25a3453
improve given / when / then checks
svalaskevicius Feb 28, 2016
4d3fdfb
remove dots
svalaskevicius Feb 28, 2016
71dd88c
add projection test
svalaskevicius Feb 28, 2016
e00ac19
simplify lines
svalaskevicius Feb 28, 2016
73b11c0
name projections
svalaskevicius Feb 28, 2016
2c60642
remove list from eventflow init
svalaskevicius Feb 28, 2016
9bc735e
update door tests
svalaskevicius Feb 28, 2016
8f2c507
delete unnecessary file
svalaskevicius Feb 28, 2016
c1fb804
Revert "wip: add book example"
svalaskevicius Feb 28, 2016
a92e350
allow custom data in initial commands
svalaskevicius Feb 28, 2016
eba8160
Merge pull request #23 from svalaskevicius/add-book
Feb 28, 2016
be783bd
move errors to db
svalaskevicius Feb 28, 2016
0f996a9
organise imports
svalaskevicius Feb 28, 2016
c09a92b
separate db elements
svalaskevicius Feb 28, 2016
00bba6b
rename for clarity
svalaskevicius Feb 28, 2016
85f937d
Merge pull request #24 from svalaskevicius/separate-db-elements
Feb 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ libraryDependencies += "com.lihaoyi" %% "pprint" % "0.3.6"
libraryDependencies += "com.lihaoyi" %% "upickle" % "0.3.6"
libraryDependencies += "com.chuusai" %% "shapeless" % "2.2.5"

libraryDependencies += "org.scalacheck" %% "scalacheck" % "1.12.4" % "test"
libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"


resolvers += Resolver.sonatypeRepo("releases")

addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.6.3")
Expand Down
132 changes: 62 additions & 70 deletions src/main/scala/Cqrs/Aggregate.scala
Original file line number Diff line number Diff line change
@@ -1,133 +1,125 @@
package Cqrs

import cats.data.{ Xor, XorT }
import cats.Monad
import cats._
import cats.free.Free
import cats.free.Free.{ pure, liftF }
import Cqrs.Aggregate.AggregateId
import Cqrs.Database.{EventDatabaseWithFailure, VersionedEvents}
import cats.{MonadState, MonadError}
import cats.data.{Xor, XorT}
import cats.state._

import cats.syntax.flatMap._
import scala.language.implicitConversions

object Aggregate {

final case class Tag(v: String)
final case class AggregateId(v: String)
val emptyAggregateId = AggregateId("")
implicit def toAggregateId(v: String): AggregateId = AggregateId(v)

implicit def aggOrdering(implicit ev: Ordering[String]): Ordering[AggregateId] = new Ordering[AggregateId] {
def compare(a: AggregateId, b: AggregateId) = ev.compare(a.v, b.v)
}

trait Error
sealed trait Error
final case class ErrorExistsAlready(id: AggregateId) extends Error
final case class ErrorDoesNotExist(id: AggregateId) extends Error
final case class ErrorUnexpectedVersion(id: AggregateId, currentVersion: Int, targetVersion: Int) extends Error
final case class ErrorCommandFailure(message: String) extends Error
final case class DatabaseError(err: Database.Error) extends Error
case object ErrorCannotFindHandler extends Error

final case class VersionedEvents[E](version: Int, events: List[E])

sealed trait EventDatabaseOp[E, A]
final case class ReadAggregateExistence[E](tag: Tag, id: AggregateId) extends EventDatabaseOp[E, Error Xor Boolean]
final case class ReadAggregate[E](tag: Tag, id: AggregateId, fromVersion: Int) extends EventDatabaseOp[E, Error Xor List[VersionedEvents[E]]]
final case class AppendAggregateEvents[E](tag: Tag, id: AggregateId, events: VersionedEvents[E]) extends EventDatabaseOp[E, Error Xor Unit]
type DatabaseWithAnyFailure[E, Err, A] = XorT[EventDatabaseWithFailure[E, ?], Err, A]
type DatabaseWithAggregateFailure[E, A] = DatabaseWithAnyFailure[E, Error, A]

type EventDatabase[E, A] = Free[EventDatabaseOp[E, ?], A]
type EventDatabaseWithAnyFailure[E, Err, A] = XorT[EventDatabase[E, ?], Err, A]
type EventDatabaseWithFailure[E, A] = EventDatabaseWithAnyFailure[E, Error, A]
def dbAction[E, A](dbActions: Database.EventDatabaseWithFailure[E, A]): DatabaseWithAggregateFailure[E, A] =
XorT[EventDatabaseWithFailure[E, ?], Error, A](dbActions.map(Xor.right))

final case class AggregateState[D](id: AggregateId, state: D, version: Int)
type AggregateDefAnyD[E, D, A] = StateT[EventDatabaseWithFailure[E, ?], D, A]
type AggregateDefAnyD[E, D, A] = StateT[DatabaseWithAggregateFailure[E, ?], D, A]
type AggregateDef[E, D, A] = AggregateDefAnyD[E, AggregateState[D], A]

def lift[E, A](a: EventDatabaseOp[E, Error Xor A]): EventDatabaseWithFailure[E, A] =
XorT[EventDatabase[E, ?], Error, A](liftF[EventDatabaseOp[E, ?], Error Xor A](a))
implicit def eventDatabaseWithFailureMonad[E]: MonadError[DatabaseWithAnyFailure[E, ?, ?], Error] = XorT.xorTMonadError[EventDatabaseWithFailure[E, ?], Error]
implicit def aggregateDefMonad[E, D]: MonadState[AggregateDefAnyD[E, ?, ?], AggregateState[D]] = StateT.stateTMonadState[DatabaseWithAggregateFailure[E, ?], AggregateState[D]]

def doesAggregateExist[E](tag: Tag, id: AggregateId): EventDatabaseWithFailure[E, Boolean] = lift(ReadAggregateExistence[E](tag, id))

def readNewEvents[E](tag: Tag, id: AggregateId, fromVersion: Int): EventDatabaseWithFailure[E, List[VersionedEvents[E]]] =
lift(ReadAggregate[E](tag, id, fromVersion))

def appendEvents[E](tag: Tag, id: AggregateId, events: VersionedEvents[E]): EventDatabaseWithFailure[E, Unit] =
lift(AppendAggregateEvents(tag, id, events))

implicit def eventDatabaseMonad[E]: Monad[EventDatabase[E, ?]] = Free.freeMonad[EventDatabaseOp[E, ?]]
implicit def eventDatabaseWithFailureMonad[E]: MonadError[EventDatabaseWithAnyFailure[E, ?, ?], Error] = XorT.xorTMonadError[EventDatabase[E, ?], Error]
implicit def aggregateDefMonad[E, D]: MonadState[AggregateDefAnyD[E, ?, ?], AggregateState[D]] = StateT.stateTMonadState[EventDatabaseWithFailure[E, ?], AggregateState[D]]

def pure[E, A](x: A): EventDatabaseWithFailure[E, A] = eventDatabaseWithFailureMonad[E].pure(x)
def pure[E, A](x: A): DatabaseWithAggregateFailure[E, A] = eventDatabaseWithFailureMonad[E].pure(x)
def fail[E, A](x: Error): DatabaseWithAggregateFailure[E, A] = eventDatabaseWithFailureMonad[E].raiseError[A](x)

def emitEvent[E](ev: E): Error Xor List[E] = Xor.right(List(ev))
def emitEvents[E](evs: List[E]): Error Xor List[E] = Xor.right(evs)

def failCommand[Events](err: String): Error Xor Events = Xor.left(ErrorCommandFailure(err))
}

type StatefulEventDatabaseWithFailure[E, D, A] = EventDatabaseWithFailure[E, (AggregateState[D], A)]
trait InitialAggregateCommand {
def id: AggregateId
}

def runAggregateFromStart[E, D, A](a: AggregateDef[E, D, A], initState: D): StatefulEventDatabaseWithFailure[E, D, A] =
a.run(AggregateState(emptyAggregateId, initState, 0))
trait Aggregate[E, C, D] {

def continueAggregate[E, D, A](a: AggregateDef[E, D, A], state: AggregateState[D]): StatefulEventDatabaseWithFailure[E, D, A] =
a.run(state)
import Aggregate._

}
def tag: Aggregate.Tag

final case class Aggregate[E, C, D](
on: Aggregate[E, C, D]#EventHandler,
handle: Aggregate[E, C, D]#CommandHandler,
tag: Aggregate.Tag
) {
import Aggregate._
protected def on: EventHandler
protected def handle: CommandHandler
protected def initData: D

type State = AggregateState[D]
type ADStateRun[A] = AggregateState[D] => EventDatabaseWithFailure[E, (AggregateState[D], A)]
type AD[A] = AggregateDef[E, D, A]
def AD[A](a: ADStateRun[A]): AD[A] = StateT[EventDatabaseWithFailure[E, ?], AggregateState[D], A](a)
type ADStateRun[A] = AggregateState[D] => DatabaseWithAggregateFailure[E, (AggregateState[D], A)]

type AggregateDefinition[A] = AggregateDef[E, D, A]
def defineAggregate[A](a: ADStateRun[A]): AggregateDefinition[A] = StateT[DatabaseWithAggregateFailure[E, ?], AggregateState[D], A](a)

type Events = List[E]
type CommandHandler = C => D => Aggregate.Error Xor Events
type EventHandler = E => D => D

def liftToAggregateDef[A](f: EventDatabaseWithFailure[E, A]): AD[A] = AD(s => f.map((s, _)))
def liftToAggregateDef[A](f: DatabaseWithAggregateFailure[E, A]): AggregateDefinition[A] = defineAggregate(s => f.map((s, _)))

def initAggregate[Cmd <: InitialAggregateCommand with C](initCmd: Cmd): DatabaseWithAggregateFailure[E, State] = {
import Database._
val id = initCmd.id
val initState: DatabaseWithAggregateFailure[E, State] =
dbAction(doesAggregateExist[E](tag, id)).flatMap { e: Boolean =>
if (e) fail(ErrorExistsAlready(id))
else dbAction(appendEvents[E](tag, id, VersionedEvents[E](1, List())).map(_ => newState(id)))
}
initState.flatMap(handleCommand(initCmd).runS)
}

def initAggregate(id: AggregateId): AD[Unit] =
liftToAggregateDef(doesAggregateExist(tag, id)) >>=
((e: Boolean) => AD(
vs => {
if (e) XorT.left[EventDatabase[E, ?], Error, (AggregateState[D], Unit)](eventDatabaseMonad[E].pure(ErrorExistsAlready(id)))
else appendEvents(tag, id, VersionedEvents[E](1, List())).map(_ => (vs.copy(id = id), ()))
}
))
def newState(id: AggregateId) = new State(id, initData, 0)

def handleCommand(cmd: C): AD[Unit] = {
def handleCommand(cmd: C): AggregateDefinition[Unit] = {
import Database._
for {
events <- AD(vs => readNewEvents[E](tag, vs.id, vs.version).map((vs, _)))
events <- defineAggregate(vs => dbAction(readNewEvents[E](tag, vs.id, vs.version).map((vs, _))))
_ <- applyEvents(events)
resultEvents <- handleCmd(cmd)
_ <- onEvents(resultEvents)
} yield ()
}

private def handleCmd(cmd: C): AD[Events] = AD(vs =>
XorT.fromXor[EventDatabase[E, ?]][Error, Events](handle(cmd)(vs.state)).map((vs, _)))
private def handleCmd(cmd: C): AggregateDefinition[Events] = defineAggregate(vs =>
XorT.fromXor[EventDatabaseWithFailure[E, ?]](
handle(cmd)(vs.state)
).map((vs, _))
)

private def onEvents(evs: Events): AD[Unit] =
AD(vs => {
private def onEvents(evs: Events): AggregateDefinition[Unit] =
defineAggregate { vs =>
import Database._
val vevs = VersionedEvents[E](vs.version + 1, evs)
appendEvents(tag, vs.id, vevs).map(_ => (vs, List(vevs)))
}) >>=
applyEvents _
dbAction(appendEvents(tag, vs.id, vevs).map(_ => (vs, List(vevs))))
}.flatMap(applyEvents)

private def applyEvents(evs: List[VersionedEvents[E]]): AD[Unit] =
AD(vs => {
private def applyEvents(evs: List[VersionedEvents[E]]): AggregateDefinition[Unit] =
defineAggregate { vs =>
val vs_ = evs.foldLeft(vs)((vs_, ve) => {
if (vs_.version < ve.version) {
vs_.copy(state = ve.events.foldLeft(vs_.state)((d, e) => on(e)(d)), version = ve.version)
} else {
vs_
}
})
eventDatabaseWithFailureMonad[E].pure((vs_, ()))
})
pure((vs_, ()))
}
}

42 changes: 27 additions & 15 deletions src/main/scala/Cqrs/BatchRunner.scala
Original file line number Diff line number Diff line change
@@ -1,40 +1,41 @@
package Cqrs

import Cqrs.Database.{ Backend, EventSerialisation }
import Cqrs.Aggregate._
import Cqrs.Database.{Backend, EventSerialisation}
import cats.data.Xor
import cats.state.StateT

import Cqrs.Aggregate.{ AggregateDef, AggregateState, EventDatabaseWithFailure, Error }
import cats.~>

import lib.HList.{KMapper, kMap}
import shapeless._
import lib.HList.{ KMapper, kMap }
import shapeless.syntax.typeable._

object BatchRunner {
def forDb[Db: Backend](db: Db) = BatchRunner[Db, HNil.type](db, HNil)

}

final case class BatchRunner[Db: Backend, PROJS <: HList](db: Db, projections: PROJS) {
final case class BatchRunner[Db: Backend, PROJS <: HList](db: Db, projections: PROJS)(implicit m: KMapper[Projection, PROJS, Projection, PROJS]) {

type Self = BatchRunner[Db, PROJS]

type DbActions[A] = StateT[(Self, Error) Xor ?, Self, A]

def addProjection[D](proj: Projection[D])(implicit m: KMapper[Projection, PROJS, Projection, PROJS]): BatchRunner[Db, Projection[D] :: PROJS] = copy(projections = proj :: projections).runProjections
def addProjection[D](proj: Projection[D]): BatchRunner[Db, Projection[D] :: PROJS] = copy(projections = proj :: projections).runProjections

def db[E, A](actions: EventDatabaseWithFailure[E, A])(implicit eventSerialiser: EventSerialisation[E], m: KMapper[Projection, PROJS, Projection, PROJS]): DbActions[A] =
//TODO: rename dbs
def db[E, A](actions: DatabaseWithAggregateFailure[E, A])(implicit eventSerialiser: EventSerialisation[E]): DbActions[A] =
new DbActions[A](
Xor.right((runner: BatchRunner[Db, PROJS]) => {
val failureOrRes = Database.runDb(runner.db, actions)
val dbAndFailureOrRes = failureOrRes.fold(e => (db, Xor.left(e)), res => (res._1, Xor.right(res._2)))
dbAndFailureOrRes._2.fold(e => Xor.left((runner, e)), a => Xor.right((runner.copy(db = dbAndFailureOrRes._1).runProjections, a)))
Database.runDb(runner.db, actions.value) match {
case Xor.Left(err) => Xor.left((runner, DatabaseError(err)))
case Xor.Right((_, Xor.Left(err))) => Xor.left((runner, err))
case Xor.Right((newDb, Xor.Right(ret))) => Xor.right((runner.copy(db = newDb).runProjections, ret))
}
})
)

def db[E, A, S, AA](prev: (AggregateState[S], AA), aggregate: AggregateDef[E, S, A])(implicit eventSerialiser: EventSerialisation[E], m: KMapper[Projection, PROJS, Projection, PROJS]): DbActions[(AggregateState[S], A)] = {
val actions = aggregate.run(prev._1)
db(actions)
def db[E, A, S](aggregateState: AggregateState[S], aggregateActions: AggregateDef[E, S, A])(implicit eventSerialiser: EventSerialisation[E]): DbActions[(AggregateState[S], A)] = {
db(aggregateActions.run(aggregateState))
}

object runProjection extends (Projection ~> Projection) {
Expand All @@ -44,7 +45,18 @@ final case class BatchRunner[Db: Backend, PROJS <: HList](db: Db, projections: P
}
}

def runProjections(implicit m: KMapper[Projection, PROJS, Projection, PROJS]) = copy(projections = kMap(projections, runProjection))
def runProjections = copy(projections = kMap(projections, runProjection))

def getProjectionData[D: Typeable](name: String): Option[D] = {
def findProjection[P <: HList](current: P): Option[D] = current match {
case (x: Projection[_]) :: _ if x.name == name => x.data.cast[D]
case _ :: xs => findProjection(xs)
case HNil => None
}
findProjection(projections)
}

def run[A](actions: DbActions[A]): (Self, Error) Xor (Self, A) = actions.run(this)

def withDb(f: Db => Db) = copy(db = f(db))
}
42 changes: 37 additions & 5 deletions src/main/scala/Cqrs/Database.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,53 @@
package Cqrs

import Cqrs.Aggregate.{ EventDatabaseWithFailure, AggregateId, Tag, Error }
import cats.data.Xor
import Cqrs.Aggregate.{AggregateId, Tag}
import cats.{MonadError, Monad}
import cats.data.{XorT, Xor}
import cats.free.Free
import cats.free.Free.liftF

import scala.util.Try

object Database {

sealed trait Error
final case class ErrorDbFailure(message: String) extends Error
final case class EventDecodingFailure(rawData: String) extends Error
final case class ErrorDoesNotExist(id: AggregateId) extends Error
final case class ErrorUnexpectedVersion(id: AggregateId, currentVersion: Int, targetVersion: Int) extends Error

final case class VersionedEvents[E](version: Int, events: List[E])

sealed trait EventDatabaseOp[E, A]
final case class ReadAggregateExistence[E](tag: Tag, id: AggregateId) extends EventDatabaseOp[E, Error Xor Boolean]
final case class ReadAggregate[E](tag: Tag, id: AggregateId, fromVersion: Int) extends EventDatabaseOp[E, Error Xor List[VersionedEvents[E]]]
final case class AppendAggregateEvents[E](tag: Tag, id: AggregateId, events: VersionedEvents[E]) extends EventDatabaseOp[E, Error Xor Unit]

type EventDatabase[E, A] = Free[EventDatabaseOp[E, ?], A]
type EventDatabaseWithAnyFailure[E, Err, A] = XorT[EventDatabase[E, ?], Err, A]
type EventDatabaseWithFailure[E, A] = EventDatabaseWithAnyFailure[E, Error, A]

def lift[E, A](a: EventDatabaseOp[E, Error Xor A]): EventDatabaseWithFailure[E, A] =
XorT[EventDatabase[E, ?], Error, A](liftF[EventDatabaseOp[E, ?], Error Xor A](a))

def doesAggregateExist[E](tag: Tag, id: AggregateId): EventDatabaseWithFailure[E, Boolean] = lift(ReadAggregateExistence[E](tag, id))

def readNewEvents[E](tag: Tag, id: AggregateId, fromVersion: Int): EventDatabaseWithFailure[E, List[VersionedEvents[E]]] =
lift(ReadAggregate[E](tag, id, fromVersion))

def appendEvents[E](tag: Tag, id: AggregateId, events: VersionedEvents[E]): EventDatabaseWithFailure[E, Unit] =
lift(AppendAggregateEvents(tag, id, events))

implicit def eventDatabaseMonad[E]: Monad[EventDatabase[E, ?]] = Free.freeMonad[EventDatabaseOp[E, ?]]
implicit def eventDatabaseWithFailureMonad[E]: MonadError[EventDatabaseWithAnyFailure[E, ?, ?], Error] = XorT.xorTMonadError[EventDatabase[E, ?], Error]

/**
* Database backend typeclass exposing the DB API.
*
* @tparam Db specific database connection handle
*/
trait Backend[Db] {

/**
* Run aggregate interpreter
*
Expand All @@ -38,9 +73,6 @@ object Database {
def consumeDbEvents[D](database: Db, fromOperation: Int, initData: D, query: List[EventDataConsumerQuery[D]]): Error Xor (Int, D)
}

final case class ErrorDbFailure(message: String) extends Error
final case class EventDecodingFailure(rawData: String) extends Error

trait EventSerialisation[E] {
def encode(event: E): String
def decode(rawData: String): Error Xor E
Expand Down
Loading