Skip to content

Commit

Permalink
Upgrade to fs2-1.0.0-M1 (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
rossabaker authored and zainab-ali committed Jun 4, 2018
1 parent 01e5f02 commit 97c1591
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 35 deletions.
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import ReleaseTransformations._

lazy val buildSettings = Seq(
organization := "com.github.zainab-ali",
crossScalaVersions := List("2.12.4", "2.11.12"),
crossScalaVersions := List("2.12.6", "2.11.12"),
scalaVersion := crossScalaVersions.value.head,
name := "fs2-reactive-streams"
)
Expand All @@ -28,9 +28,9 @@ lazy val commonSettings = Seq(
resolvers ++= commonResolvers,
scalacOptions ++= commonScalacOptions,
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % "0.10.1",
"co.fs2" %% "fs2-core" % "1.0.0-M1",
"org.reactivestreams" % "reactive-streams" % "1.0.2",
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.scalacheck" %% "scalacheck" % "1.13.5" % "test",
"org.reactivestreams" % "reactive-streams-tck" % "1.0.2" % "test"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ package reactivestreams

import cats._
import cats.effect._
import cats.effect.concurrent.{Deferred, Ref}
import cats.implicits._
import fs2.async.{Promise, Ref}
import org.reactivestreams._

import scala.concurrent.ExecutionContext
Expand All @@ -16,8 +16,8 @@ import scala.concurrent.ExecutionContext
*
* @see https://github.com/reactive-streams/reactive-streams-jvm#2-subscriber-code
*/
final class StreamSubscriber[F[_], A](val sub: StreamSubscriber.FSM[F, A])(implicit A: Effect[F],
ec: ExecutionContext)
final class StreamSubscriber[F[_], A](val sub: StreamSubscriber.FSM[F, A])(implicit F: ConcurrentEffect[F],
timer: Timer[F])
extends Subscriber[A] {

/** Called by an upstream reactivestreams system */
Expand Down Expand Up @@ -48,7 +48,7 @@ final class StreamSubscriber[F[_], A](val sub: StreamSubscriber.FSM[F, A])(impli
}

object StreamSubscriber {
def apply[F[_], A](implicit AA: Effect[F], ec: ExecutionContext): F[StreamSubscriber[F, A]] =
def apply[F[_], A](implicit AA: ConcurrentEffect[F], timer: Timer[F]): F[StreamSubscriber[F, A]] =
fsm[F, A].map(new StreamSubscriber(_))

/** A finite state machine describing the subscriber */
Expand Down Expand Up @@ -77,8 +77,7 @@ object StreamSubscriber {
Stream.eval(dequeue1).repeat.rethrow.unNoneTerminate.onFinalize(onFinalize)
}

private[reactivestreams] def fsm[F[_], A](implicit F: Effect[F],
ec: ExecutionContext): F[FSM[F, A]] = {
private[reactivestreams] def fsm[F[_], A](implicit F: Concurrent[F]): F[FSM[F, A]] = {

type Out = Either[Throwable, Option[A]]

Expand All @@ -88,13 +87,13 @@ object StreamSubscriber {
case class OnError(e: Throwable) extends Input
case object OnComplete extends Input
case object OnFinalize extends Input
case class OnDequeue(response: Promise[F, Out]) extends Input
case class OnDequeue(response: Deferred[F, Out]) extends Input

sealed trait State
case object Uninitialized extends State
case class Idle(sub: Subscription) extends State
case class RequestBeforeSubscription(req: Promise[F, Out]) extends State
case class WaitingOnUpstream(sub: Subscription, elementRequest: Promise[F, Out]) extends State
case class RequestBeforeSubscription(req: Deferred[F, Out]) extends State
case class WaitingOnUpstream(sub: Subscription, elementRequest: Deferred[F, Out]) extends State
case object UpstreamCompletion extends State
case object DownstreamCancellation extends State
case class UpstreamError(err: Throwable) extends State
Expand Down Expand Up @@ -135,17 +134,17 @@ object StreamSubscriber {
}
}

async.refOf[F, State](Uninitialized).map { ref =>
Ref.of[F, State](Uninitialized).map { ref =>
new FSM[F, A] {
def nextState(in: Input): F[Unit] = ref.modify2(step(in)).flatMap(_._2)
def nextState(in: Input): F[Unit] = ref.modify(step(in)).flatten
def onSubscribe(s: Subscription): F[Unit] = nextState(OnSubscribe(s))
def onNext(a: A): F[Unit] = nextState(OnNext(a))
def onError(t: Throwable): F[Unit] = nextState(OnError(t))
def onComplete: F[Unit] = nextState(OnComplete)
def onFinalize: F[Unit] = nextState(OnFinalize)
def dequeue1: F[Either[Throwable, Option[A]]] =
async.promise[F, Out].flatMap { p =>
ref.modify2(step(OnDequeue(p))).flatMap(_._2) *> p.get
Deferred[F, Out].flatMap { p =>
ref.modify(step(OnDequeue(p))).flatten *> p.get
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ final class StreamSubscription[F[_], A](
cancelled: Signal[F, Boolean],
sub: Subscriber[A],
stream: Stream[F, A]
)(implicit F: Effect[F], ec: ExecutionContext)
)(implicit F: ConcurrentEffect[F], timer: Timer[F])
extends Subscription {
import StreamSubscription._

Expand Down Expand Up @@ -92,8 +92,8 @@ object StreamSubscription {
case object Infinite extends Request
case class Finite(n: Long) extends Request

def apply[F[_]: Effect, A](sub: Subscriber[A], stream: Stream[F, A])(
implicit ec: ExecutionContext
def apply[F[_]: ConcurrentEffect, A](sub: Subscriber[A], stream: Stream[F, A])(
implicit timer: Timer[F]
): F[StreamSubscription[F, A]] =
async.signalOf[F, Boolean](false).flatMap { cancelled =>
async.unboundedQueue[F, Request].map { requests =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import scala.concurrent.ExecutionContext
* @see https://github.com/reactive-streams/reactive-streams-jvm#1-publisher-code
*
*/
final class StreamUnicastPublisher[F[_]: Effect, A](
final class StreamUnicastPublisher[F[_]: ConcurrentEffect, A](
val s: Stream[F, A]
)(implicit ec: ExecutionContext)
)(implicit timer: Timer[F])
extends Publisher[A] {

def subscribe(subscriber: Subscriber[_ >: A]): Unit = {
Expand All @@ -37,8 +37,8 @@ final class StreamUnicastPublisher[F[_]: Effect, A](
}

object StreamUnicastPublisher {
def apply[F[_]: Effect, A](
def apply[F[_]: ConcurrentEffect, A](
s: Stream[F, A]
)(implicit ec: ExecutionContext): StreamUnicastPublisher[F, A] =
)(implicit timer: Timer[F]): StreamUnicastPublisher[F, A] =
new StreamUnicastPublisher(s)
}
10 changes: 5 additions & 5 deletions core/src/main/scala/fs2/interop/reactivestreams/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ package object reactivestreams {
*
* The publisher only receives a subscriber when the stream is run.
*/
def fromPublisher[F[_]: Effect, A](
def fromPublisher[F[_]: ConcurrentEffect, A](
p: Publisher[A]
)(implicit ec: ExecutionContext): Stream[F, A] =
)(implicit timer: Timer[F]): Stream[F, A] =
Stream
.eval(StreamSubscriber[F, A].map { s =>
p.subscribe(s)
Expand All @@ -26,7 +26,7 @@ package object reactivestreams {
implicit final class PublisherOps[A](val pub: Publisher[A]) extends AnyVal {

/** Creates a lazy stream from an org.reactivestreams.Publisher */
def toStream[F[_]]()(implicit F: Effect[F], ec: ExecutionContext): Stream[F, A] =
def toStream[F[_]]()(implicit F: ConcurrentEffect[F], timer: Timer[F]): Stream[F, A] =
fromPublisher(pub)
}

Expand All @@ -37,8 +37,8 @@ package object reactivestreams {
* This publisher can only have a single subscription.
* The stream is only ran when elements are requested.
*/
def toUnicastPublisher()(implicit F: Effect[F],
ec: ExecutionContext): StreamUnicastPublisher[F, A] =
def toUnicastPublisher()(implicit F: ConcurrentEffect[F],
timer: Timer[F]): StreamUnicastPublisher[F, A] =
StreamUnicastPublisher(stream)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package reactivestreams
import java.util.concurrent.atomic.AtomicInteger

import cats.effect._
import cats.implicits._
import org.reactivestreams._
import org.reactivestreams.tck.SubscriberWhiteboxVerification.{
SubscriberPuppet,
Expand Down Expand Up @@ -77,18 +78,15 @@ class SubscriberBlackboxSpec
with TestNGSuiteLike {
implicit val ec: ExecutionContext = ExecutionContext.global

val (scheduler: Scheduler, _) =
Scheduler
.allocate[IO](corePoolSize = 2, threadPrefix = "subscriber-blackbox-spec-scheduler")
.unsafeRunSync()
val timer = Timer[IO]

private val counter = new AtomicInteger()

def createSubscriber(): StreamSubscriber[IO, Int] = StreamSubscriber[IO, Int].unsafeRunSync()

override def triggerRequest(s: Subscriber[_ >: Int]): Unit = {
val req = s.asInstanceOf[StreamSubscriber[IO, Int]].sub.dequeue1
(scheduler.sleep_[IO](100 milliseconds) ++ Stream.eval(req)).compile.drain.unsafeRunAsync(_ => ())
(Stream.eval(timer.sleep(100.milliseconds) *> req)).compile.drain.unsafeRunAsync(_ => ())
}

def createElement(i: Int): Int = counter.incrementAndGet()
Expand Down
2 changes: 1 addition & 1 deletion core/version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.5.2-SNAPSHOT"
version in ThisBuild := "0.6.0-SNAPSHOT"
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.0.4
sbt.version=1.1.6

0 comments on commit 97c1591

Please sign in to comment.