diff --git a/build.sbt b/build.sbt index 83fb8e4..0d347c4 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,7 @@ import ReleaseTransformations._ lazy val buildSettings = Seq( organization := "com.github.zainab-ali", - crossScalaVersions := List("2.12.4", "2.11.12"), + crossScalaVersions := List("2.12.6", "2.11.12"), scalaVersion := crossScalaVersions.value.head, name := "fs2-reactive-streams" ) @@ -28,9 +28,9 @@ lazy val commonSettings = Seq( resolvers ++= commonResolvers, scalacOptions ++= commonScalacOptions, libraryDependencies ++= Seq( - "co.fs2" %% "fs2-core" % "0.10.1", + "co.fs2" %% "fs2-core" % "1.0.0-M1", "org.reactivestreams" % "reactive-streams" % "1.0.2", - "org.scalatest" %% "scalatest" % "3.0.4" % "test", + "org.scalatest" %% "scalatest" % "3.0.5" % "test", "org.scalacheck" %% "scalacheck" % "1.13.5" % "test", "org.reactivestreams" % "reactive-streams-tck" % "1.0.2" % "test" ) diff --git a/core/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala b/core/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala index ef2464a..489e797 100644 --- a/core/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala +++ b/core/src/main/scala/fs2/interop/reactivestreams/StreamSubscriber.scala @@ -4,8 +4,8 @@ package reactivestreams import cats._ import cats.effect._ +import cats.effect.concurrent.{Deferred, Ref} import cats.implicits._ -import fs2.async.{Promise, Ref} import org.reactivestreams._ import scala.concurrent.ExecutionContext @@ -16,8 +16,8 @@ import scala.concurrent.ExecutionContext * * @see https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code */ -final class StreamSubscriber[F[_], A](val sub: StreamSubscriber.FSM[F, A])(implicit A: Effect[F], - ec: ExecutionContext) +final class StreamSubscriber[F[_], A](val sub: StreamSubscriber.FSM[F, A])(implicit F: ConcurrentEffect[F], + timer: Timer[F]) extends Subscriber[A] { /** Called by an upstream reactivestreams system */ @@ -48,7 +48,7 @@ final class StreamSubscriber[F[_], A](val sub: StreamSubscriber.FSM[F, A])(impli } object StreamSubscriber { - def apply[F[_], A](implicit AA: Effect[F], ec: ExecutionContext): F[StreamSubscriber[F, A]] = + def apply[F[_], A](implicit AA: ConcurrentEffect[F], timer: Timer[F]): F[StreamSubscriber[F, A]] = fsm[F, A].map(new StreamSubscriber(_)) /** A finite state machine describing the subscriber */ @@ -77,8 +77,7 @@ object StreamSubscriber { Stream.eval(dequeue1).repeat.rethrow.unNoneTerminate.onFinalize(onFinalize) } - private[reactivestreams] def fsm[F[_], A](implicit F: Effect[F], - ec: ExecutionContext): F[FSM[F, A]] = { + private[reactivestreams] def fsm[F[_], A](implicit F: Concurrent[F]): F[FSM[F, A]] = { type Out = Either[Throwable, Option[A]] @@ -88,13 +87,13 @@ object StreamSubscriber { case class OnError(e: Throwable) extends Input case object OnComplete extends Input case object OnFinalize extends Input - case class OnDequeue(response: Promise[F, Out]) extends Input + case class OnDequeue(response: Deferred[F, Out]) extends Input sealed trait State case object Uninitialized extends State case class Idle(sub: Subscription) extends State - case class RequestBeforeSubscription(req: Promise[F, Out]) extends State - case class WaitingOnUpstream(sub: Subscription, elementRequest: Promise[F, Out]) extends State + case class RequestBeforeSubscription(req: Deferred[F, Out]) extends State + case class WaitingOnUpstream(sub: Subscription, elementRequest: Deferred[F, Out]) extends State case object UpstreamCompletion extends State case object DownstreamCancellation extends State case class UpstreamError(err: Throwable) extends State @@ -135,17 +134,17 @@ object StreamSubscriber { } } - async.refOf[F, State](Uninitialized).map { ref => + Ref.of[F, State](Uninitialized).map { ref => new FSM[F, A] { - def nextState(in: Input): F[Unit] = ref.modify2(step(in)).flatMap(_._2) + def nextState(in: Input): F[Unit] = ref.modify(step(in)).flatten def onSubscribe(s: Subscription): F[Unit] = nextState(OnSubscribe(s)) def onNext(a: A): F[Unit] = nextState(OnNext(a)) def onError(t: Throwable): F[Unit] = nextState(OnError(t)) def onComplete: F[Unit] = nextState(OnComplete) def onFinalize: F[Unit] = nextState(OnFinalize) def dequeue1: F[Either[Throwable, Option[A]]] = - async.promise[F, Out].flatMap { p => - ref.modify2(step(OnDequeue(p))).flatMap(_._2) *> p.get + Deferred[F, Out].flatMap { p => + ref.modify(step(OnDequeue(p))).flatten *> p.get } } } diff --git a/core/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala b/core/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala index b5da5ad..104b096 100644 --- a/core/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala +++ b/core/src/main/scala/fs2/interop/reactivestreams/StreamSubscription.scala @@ -20,7 +20,7 @@ final class StreamSubscription[F[_], A]( cancelled: Signal[F, Boolean], sub: Subscriber[A], stream: Stream[F, A] -)(implicit F: Effect[F], ec: ExecutionContext) +)(implicit F: ConcurrentEffect[F], timer: Timer[F]) extends Subscription { import StreamSubscription._ @@ -92,8 +92,8 @@ object StreamSubscription { case object Infinite extends Request case class Finite(n: Long) extends Request - def apply[F[_]: Effect, A](sub: Subscriber[A], stream: Stream[F, A])( - implicit ec: ExecutionContext + def apply[F[_]: ConcurrentEffect, A](sub: Subscriber[A], stream: Stream[F, A])( + implicit timer: Timer[F] ): F[StreamSubscription[F, A]] = async.signalOf[F, Boolean](false).flatMap { cancelled => async.unboundedQueue[F, Request].map { requests => diff --git a/core/src/main/scala/fs2/interop/reactivestreams/StreamUnicastPublisher.scala b/core/src/main/scala/fs2/interop/reactivestreams/StreamUnicastPublisher.scala index ff5b624..940c6c5 100644 --- a/core/src/main/scala/fs2/interop/reactivestreams/StreamUnicastPublisher.scala +++ b/core/src/main/scala/fs2/interop/reactivestreams/StreamUnicastPublisher.scala @@ -15,9 +15,9 @@ import scala.concurrent.ExecutionContext * @see https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code * */ -final class StreamUnicastPublisher[F[_]: Effect, A]( +final class StreamUnicastPublisher[F[_]: ConcurrentEffect, A]( val s: Stream[F, A] -)(implicit ec: ExecutionContext) +)(implicit timer: Timer[F]) extends Publisher[A] { def subscribe(subscriber: Subscriber[_ >: A]): Unit = { @@ -37,8 +37,8 @@ final class StreamUnicastPublisher[F[_]: Effect, A]( } object StreamUnicastPublisher { - def apply[F[_]: Effect, A]( + def apply[F[_]: ConcurrentEffect, A]( s: Stream[F, A] - )(implicit ec: ExecutionContext): StreamUnicastPublisher[F, A] = + )(implicit timer: Timer[F]): StreamUnicastPublisher[F, A] = new StreamUnicastPublisher(s) } diff --git a/core/src/main/scala/fs2/interop/reactivestreams/package.scala b/core/src/main/scala/fs2/interop/reactivestreams/package.scala index 3f8847f..a8f85f1 100644 --- a/core/src/main/scala/fs2/interop/reactivestreams/package.scala +++ b/core/src/main/scala/fs2/interop/reactivestreams/package.scala @@ -13,9 +13,9 @@ package object reactivestreams { * * The publisher only receives a subscriber when the stream is run. */ - def fromPublisher[F[_]: Effect, A]( + def fromPublisher[F[_]: ConcurrentEffect, A]( p: Publisher[A] - )(implicit ec: ExecutionContext): Stream[F, A] = + )(implicit timer: Timer[F]): Stream[F, A] = Stream .eval(StreamSubscriber[F, A].map { s => p.subscribe(s) @@ -26,7 +26,7 @@ package object reactivestreams { implicit final class PublisherOps[A](val pub: Publisher[A]) extends AnyVal { /** Creates a lazy stream from an org.reactivestreams.Publisher */ - def toStream[F[_]]()(implicit F: Effect[F], ec: ExecutionContext): Stream[F, A] = + def toStream[F[_]]()(implicit F: ConcurrentEffect[F], timer: Timer[F]): Stream[F, A] = fromPublisher(pub) } @@ -37,8 +37,8 @@ package object reactivestreams { * This publisher can only have a single subscription. * The stream is only ran when elements are requested. */ - def toUnicastPublisher()(implicit F: Effect[F], - ec: ExecutionContext): StreamUnicastPublisher[F, A] = + def toUnicastPublisher()(implicit F: ConcurrentEffect[F], + timer: Timer[F]): StreamUnicastPublisher[F, A] = StreamUnicastPublisher(stream) } } diff --git a/core/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala b/core/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala index e4e448e..39757c0 100644 --- a/core/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala +++ b/core/src/test/scala/fs2/interop/reactivestreams/SubscriberSpec.scala @@ -5,6 +5,7 @@ package reactivestreams import java.util.concurrent.atomic.AtomicInteger import cats.effect._ +import cats.implicits._ import org.reactivestreams._ import org.reactivestreams.tck.SubscriberWhiteboxVerification.{ SubscriberPuppet, @@ -77,10 +78,7 @@ class SubscriberBlackboxSpec with TestNGSuiteLike { implicit val ec: ExecutionContext = ExecutionContext.global - val (scheduler: Scheduler, _) = - Scheduler - .allocate[IO](corePoolSize = 2, threadPrefix = "subscriber-blackbox-spec-scheduler") - .unsafeRunSync() + val timer = Timer[IO] private val counter = new AtomicInteger() @@ -88,7 +86,7 @@ class SubscriberBlackboxSpec override def triggerRequest(s: Subscriber[_ >: Int]): Unit = { val req = s.asInstanceOf[StreamSubscriber[IO, Int]].sub.dequeue1 - (scheduler.sleep_[IO](100 milliseconds) ++ Stream.eval(req)).compile.drain.unsafeRunAsync(_ => ()) + (Stream.eval(timer.sleep(100.milliseconds) *> req)).compile.drain.unsafeRunAsync(_ => ()) } def createElement(i: Int): Int = counter.incrementAndGet() diff --git a/core/version.sbt b/core/version.sbt index f943617..3f478e0 100644 --- a/core/version.sbt +++ b/core/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.5.2-SNAPSHOT" +version in ThisBuild := "0.6.0-SNAPSHOT" diff --git a/project/build.properties b/project/build.properties index 394cb75..d6e3507 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.0.4 +sbt.version=1.1.6