Skip to content

Commit fc767b6

Browse files
committed
Fixed the try hang for both provider_chain and provider spread
- provider_loop still has issues with try scripts
1 parent 9dee403 commit fc767b6

File tree

2 files changed

+34
-7
lines changed

2 files changed

+34
-7
lines changed

lib/channel/src/lib.rs

+33-7
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ impl<T: Serialize> Channel<T> {
181181
self.on_demand_events.notify(1);
182182
}
183183

184+
/// notify all OnDemand with an event listener
185+
fn notify_all_on_demand(&self) {
186+
self.on_demand_events.notify(std::usize::MAX);
187+
}
188+
184189
/// create a listener so an OnDemand can get notice when demand has been requested
185190
/// (a receiver tried to receive but the queue was empty)
186191
fn on_demand_listen(&self) -> EventListener {
@@ -340,7 +345,13 @@ impl<T: Serialize> Sender<T> {
340345
impl<T: Serialize> Clone for Sender<T> {
341346
fn clone(&self) -> Self {
342347
let count = self.channel.increment_sender_count();
343-
info!("Sender::clone: {} new count: {}", self.name(), count);
348+
let on_demand_count = self.channel.on_demand_count();
349+
info!(
350+
"Sender::Clone channel {}, new count: {}, on_demand_count: {}",
351+
self.name(),
352+
count,
353+
on_demand_count
354+
);
344355
Sender {
345356
channel: self.channel.clone(),
346357
listener: None,
@@ -353,8 +364,15 @@ impl<T: Serialize> Clone for Sender<T> {
353364
impl<T: Serialize> Drop for Sender<T> {
354365
fn drop(&mut self) {
355366
let count = self.channel.decrement_sender_count();
356-
info!("Sender::drop: {} new count: {}", self.name(), count);
367+
let on_demand_count = self.channel.on_demand_count();
368+
info!(
369+
"Sender::Drop channel {}, new count: {}, on_demand_count: {}",
370+
self.name(),
371+
count,
372+
on_demand_count
373+
);
357374
if count == 0 {
375+
info!("Sender::Drop channel {}, notify_all_receivers", self.name());
358376
self.channel.notify_all_receivers();
359377
}
360378
}
@@ -431,7 +449,7 @@ impl<T: Serialize> Sink<T> for Sender<T> {
431449
if self.no_receivers() {
432450
self.listener = None;
433451
debug!(
434-
"Sink for Sender:poll_ready {} no_receivers, length: ${}",
452+
"Sink for Sender:poll_ready {} no_receivers, length: {}",
435453
self.name(),
436454
self.len()
437455
);
@@ -531,9 +549,10 @@ pub struct Receiver<T: Serialize> {
531549
impl<T: Serialize> Clone for Receiver<T> {
532550
fn clone(&self) -> Self {
533551
let count = self.channel.increment_receiver_count();
552+
let on_demand_count = self.channel.on_demand_count();
534553
debug!(
535-
"Receiver:Clone cloning channel {}, new count: {}",
536-
self.channel.name, count
554+
"Receiver:Clone channel {}, new count: {}, on_demand_count: {}",
555+
self.channel.name, count, on_demand_count
537556
);
538557
Receiver {
539558
channel: self.channel.clone(),
@@ -547,9 +566,10 @@ impl<T: Serialize> Clone for Receiver<T> {
547566
impl<T: Serialize> Drop for Receiver<T> {
548567
fn drop(&mut self) {
549568
let new_count = self.channel.decrement_receiver_count();
569+
let on_demand_count = self.channel.on_demand_count();
550570
debug!(
551-
"Receiver:Drop channel {}, new count: {}",
552-
self.channel.name, new_count
571+
"Receiver:Drop channel {}, new count: {}, on_demand_count: {}",
572+
self.channel.name, new_count, on_demand_count
553573
);
554574
if new_count == 0 {
555575
// notify all senders so they will see there are no more receivers
@@ -558,6 +578,8 @@ impl<T: Serialize> Drop for Receiver<T> {
558578
"Receiver:Drop channel {}, notify_all_senders",
559579
self.channel.name
560580
);
581+
// When there are no more receivers we need to notify the on_demand in addition to the normal senders
582+
self.channel.notify_all_on_demand();
561583
self.channel.notify_all_senders();
562584
}
563585
}
@@ -674,6 +696,10 @@ impl<T: Serialize + Send + 'static> Stream for OnDemandReceiver<T> {
674696
self.channel.len()
675697
);
676698
self.listener = None;
699+
debug!(
700+
"OnDemandReceiver::poll_next {} listener: None",
701+
self.channel.name
702+
);
677703
return Poll::Ready(None);
678704
}
679705

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,7 @@ fn create_try_run_future(
10131013

10141014
let mut test_ended_rx = BroadcastStream::new(test_ended_tx.subscribe());
10151015
let mut left = try_join_all(endpoint_calls).map(move |r| {
1016+
debug!("create_try_run_future try_join_all finish {:?}", r);
10161017
let _ = test_ended_tx.send(r.map(|_| TestEndReason::Completed));
10171018
});
10181019
let f = future::poll_fn(move |cx| match left.poll_unpin(cx) {

0 commit comments

Comments
 (0)