Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription request and cancel must enqueue synchronously #29

Closed
zainab-ali opened this issue Aug 18, 2017 · 16 comments
Closed

Subscription request and cancel must enqueue synchronously #29

zainab-ali opened this issue Aug 18, 2017 · 16 comments
Labels

Comments

@zainab-ali
Copy link
Owner

This surfaced in #28 .

According to the Spec, calls to request after cancel must be noops.

However, the code for the two calls executes asynchronously. It's entirely possible for the calls to occur out of order, and thus for the subscriber to receive unwanted messages:

//cancel
F.runAsync(requests.enqueue1(Cancelled))(_ => IO.unit).unsafeRunSync
//request
F.runAsync(requests.enqueue1(request))(_ => IO.unit).unsafeRunSync

The unsafeRunSync (I think) doesn't cause the end IO to be run synchronously.

@aeons
Copy link
Contributor

aeons commented Aug 18, 2017

Yeah, I've tried to wrap my head around the runAsync -> unsafeRunSync several times, and it does (as I understand it) cause the outer IOto be run synchronously, but the inner IO, that actually does stuff is still run async. And I'm not sure there's an obvious way to do what we want here.

@rossabaker
Copy link
Contributor

I fixed it like this: rossabaker@c1ec229. Request and cancel have to decrement the semaphore to enqueue and increment it to release the lock for the next. A semaphore felt like a heavy hammer, and I was happy to see my tests pass without it, but I just got (un)lucky.

@aeons
Copy link
Contributor

aeons commented Aug 18, 2017

Didn't notice that you removed the fix again :)

@zainab-ali
Copy link
Owner Author

Are you sure that fixes it? For me, it reduces the occurrences a great deal, but doesn't completely eradicate the failure (I'm running in a for loop to test).

A semaphore will ensure that the calls can't happen concurrently, but it doesn't ensure that they will happen in the correct order.

@rossabaker
Copy link
Contributor

Yes, you're right. My patch is not sufficient.

@zainab-ali
Copy link
Owner Author

It would work provided the semaphore was decremented synchronously.
For example, using a semaphore of IO:

  def cancel(): Unit = {
    semaphore.decrement.unsafeRunSync()
    F.runAsync(requests.enqueue1(Cancelled) >> 
    semaphore.increment.liftIO[F])(_ => IO.unit).unsafeRunSync
  }

@zainab-ali zainab-ali added the bug label Aug 19, 2017
@zainab-ali
Copy link
Owner Author

The async behaviour of a semaphore.decrement means that we need to use IO to force the code to run synchronously.
I don't like the solution above because it pollutes the code with a concrete IO type, which may not mix well with the end user's choice of effect.

It's possible to ensure a cancellation is synchronous by using a Ref as a cancellation flag and calling setSyncPure.
This is synchronous regardless of chosen F.

val flag: Ref[F, Boolean] = ???
def cancel(): Unit = {
  F.runAsync(flag.setSyncPure(true) >>
    requests.enqueue1(Cancelled)
  )(_ => IO.unit).unsafeRunSync()
}

def request(n: Long): Unit = {
  F.runAsync(flag.get >>= (cancelled =>
	if(cancelled) F.pure(())
	else requests.enqueue1(...)
    ))(_ => IO.unit).unsafeRunSync
}

Getting the flag is async, but an async get after a synchronous set will always return the correct value.

So for the following code:

subscription.cancel()
subscription.request(1)
subscription.request(2)

cancel always occurs before request(1), but that request(1) and request(2) can get enqueued in any order. This fixes the bug, and is still in accordance with the spec.

@zainab-ali
Copy link
Owner Author

This has been fixed in release v0.2.3. Backporting will require a bit of effort, since the fs2 constructs have changed a lot since 0.9.

@rossabaker
Copy link
Contributor

👍 I like this solution. It works and the code is descriptive of the problem.

@zainab-ali
Copy link
Owner Author

I need to make a few changes in fs2 as discussed here before backporting. It should be fairly straightforward.

@zainab-ali
Copy link
Owner Author

As discussed in #31 , the fs2 changes were unnecessary. I've merged in the backport and will release v0.1.1 shortly.

@zainab-ali
Copy link
Owner Author

zainab-ali commented Aug 26, 2017

This has now been released in v0.1.1:tada:

@SystemFw
Copy link
Contributor

I've just realised that I've spent a whole afternoon (during the subscription refactor) to understand a problem that was fully explained here 😞

@rossabaker
Copy link
Contributor

Sorry about that. The test failure was familiar, but I didn't see the problem in your code. I should have pursued the history.

@SystemFw
Copy link
Contributor

nah it's fine :)
I was a bit lazy in trying to understand the current code before rushing to rewrite it, so I deserved it :P

@zainab-ali
Copy link
Owner Author

My mistake - I should’ve rembered this one and prompted you, it took me a while to figure out last time, but it slipped my mind 😞

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants