Skip to content

Commit 6dbbb70

Browse files
authored
Pewpew try hang (#177)
* Simplified the provider names in example * Fixed the try hang for both provider_chain and provider spread - provider_loop still has issues with try scripts * Added example with provider collect * Added additional logging to find try script issues with provider_loop example * Fixed cargo fmt
1 parent 3e2ebf6 commit 6dbbb70

File tree

5 files changed

+143
-53
lines changed

5 files changed

+143
-53
lines changed

examples/provider_collect.yaml

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Run this file with the test-server, then specify the PORT as a parameter to this try or run script.
2+
3+
vars:
4+
port: "${PORT}"
5+
6+
load_pattern:
7+
- linear:
8+
from: 100%
9+
to: 100%
10+
over: 5s
11+
12+
providers:
13+
a:
14+
range: {}
15+
b:
16+
response: {}
17+
c:
18+
response: {}
19+
d:
20+
response: {}
21+
e:
22+
response: {}
23+
f:
24+
response: {}
25+
26+
loggers:
27+
test:
28+
to: stderr
29+
30+
endpoints:
31+
- method: POST
32+
url: http://localhost:${port}
33+
body: '{"a": ${a}}'
34+
provides:
35+
b:
36+
select: response.body.a
37+
on_demand: true
38+
39+
- method: POST
40+
declare:
41+
c: collect(b, 10) # Get 10
42+
url: http://localhost:${port}
43+
body: '{"c": ${c}}'
44+
provides:
45+
d:
46+
# select: response.body.c
47+
select: for_each[0]
48+
for_each:
49+
- response.body.c
50+
on_demand: true
51+
52+
- method: POST
53+
url: http://localhost:${port}
54+
body: '{"d": ${d}}'
55+
peak_load: 1hps
56+
logs:
57+
test:
58+
select: response.body.d

examples/provider_loop_with_counter.yaml

+2-3
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ providers:
1717
response: {}
1818
c:
1919
response: {}
20-
d:
21-
response: {}
2220

2321
loggers:
2422
test:
@@ -43,8 +41,9 @@ endpoints:
4341

4442
- method: POST
4543
url: http://localhost:${port}
46-
body: '{"b": ${b.value}}'
44+
body: '{"b": ${b.value},"counter": ${b.counter}}'
4745
peak_load: 5hps
46+
# on_demand: true # We can't do on_demand due to a bug where it can't figure out that we provider for ourselves
4847
provides:
4948
b:
5049
select:

examples/provider_spread.yaml

+33-40
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
vars:
44
port: "${PORT}"
5-
createName: 'test:loadtest-'
65
groupRate: 1
76
imagesPerGroup: 10
87

@@ -13,23 +12,19 @@ load_pattern:
1312
over: 5s
1413

1514
providers:
16-
group_range: # Counter for creating groups
15+
a: # Counter for creating groups
1716
range: {}
18-
image_range: # counter for creating images to put in groups
17+
x: # counter for creating images to put in groups
1918
range: {}
20-
group_created: # to continue the group APIs
19+
b: # to continue the group APIs
2120
response: {}
22-
group_created_for_images: # to create images
21+
b2: # to create images
2322
response: {}
24-
image_created: # to continue the image APIs
23+
y: # to continue the image APIs
2524
response: {}
26-
group_create_data:
25+
c:
2726
response: {}
28-
image_create_data:
29-
response: {}
30-
group_update_data:
31-
response: {}
32-
image_update_data:
27+
z:
3328
response: {}
3429

3530
loggers:
@@ -42,15 +37,14 @@ endpoints:
4237
tags:
4338
type: create group
4439
body: '{
45-
"id":"${createName}${start_pad(group_range, 6, "0")}",
46-
"name":"TEST-GROUP"
40+
"a":"${start_pad(a, 6, "0")}"
4741
}'
4842
provides:
49-
group_created:
50-
select: response.body.id
43+
b:
44+
select: response.body.a
5145
where: response.status == 200 || response.status == 409
52-
group_created_for_images:
53-
select: response.body.id
46+
b2:
47+
select: response.body.a
5448
for_each:
5549
- repeat(imagesPerGroup) # We need to create X copies so each image will have one
5650
where: response.status == 200 || response.status == 409
@@ -61,15 +55,14 @@ endpoints:
6155
tags:
6256
type: create image
6357
body: '{
64-
"id":"${createName}${start_pad(image_range, 8, "0")}",
65-
"groupId":"${group_created_for_images}",
66-
"name":"TEST-IMAGE"
58+
"x":"${start_pad(x, 8, "0")}",
59+
"b":"${b2}"
6760
}'
6861
provides:
69-
image_created:
62+
y:
7063
select:
71-
id: response.body.id
72-
groupId: response.body.groupId
64+
x: response.body.x
65+
b: response.body.b
7366
where: response.status == 200 || response.status == 409
7467
peak_load: ${groupRate * imagesPerGroup}hps # Needs to be a higher rate to keep up with groups
7568

@@ -78,14 +71,14 @@ endpoints:
7871
tags:
7972
type: create group data
8073
body: '{
81-
"id":"${group_created}",
74+
"b":"${b}",
8275
"data":{
83-
"subdata":"TEST-DATA"
76+
"subdata":"A-DATA"
8477
}
8578
}'
8679
provides:
87-
group_create_data:
88-
select: group_created
80+
c:
81+
select: response.body.b
8982
where: response.status == 200
9083
peak_load: ${groupRate}hps
9184

@@ -94,41 +87,41 @@ endpoints:
9487
tags:
9588
type: create image
9689
body: '{
97-
"id":"${image_created.id}",
98-
"groupId":"${image_created.groupId}",
90+
"x":"${y.x}",
91+
"b":"${y.b}",
9992
"data":{
100-
"subdata":"TEST-DATA"
93+
"subdata":"X-DATA"
10194
}
10295
}'
10396
provides:
104-
image_create_data:
105-
select: image_created # Puts in the whole object (id and groupId)
97+
z:
98+
select: y # Puts in the whole object (id and groupId)
10699
where: response.status == 200
107100
peak_load: ${groupRate * imagesPerGroup}hps # Needs to be a higher rate to keep up with groups
108101

109102
- method: PUT
110103
url: http://localhost:${port}
111104
body: '{
112-
"id":"${group_create_data}",
105+
"c":"${c}",
113106
"data":{
114-
"subdata":"UPDATED-TEST-DATA"
107+
"subdata":"UPDATED-A-DATA"
115108
}
116109
}'
117110
peak_load: ${groupRate}hps
118111
logs:
119112
test:
120-
select: response.body.id
113+
select: response.body.c
121114

122115
- method: PUT
123116
url: http://localhost:${port}
124117
body: '{
125-
"id":"${image_create_data.id}",
126-
"groupId":"${image_create_data.groupId}",
118+
"x":"${z.x}",
119+
"b":"${z.b}",
127120
"data":{
128-
"subdata":"UPDATED-TEST-DATA"
121+
"subdata":"UPDATED-X-DATA"
129122
}
130123
}'
131124
peak_load: ${groupRate * imagesPerGroup}hps # Needs to be a higher rate to keep up with groups
132125
logs:
133126
test:
134-
select: response.body.id
127+
select: response.body.x

lib/channel/src/lib.rs

+49-10
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ impl<T: Serialize> Channel<T> {
181181
self.on_demand_events.notify(1);
182182
}
183183

184+
/// notify all OnDemand with an event listener
185+
fn notify_all_on_demand(&self) {
186+
self.on_demand_events.notify(std::usize::MAX);
187+
}
188+
184189
/// create a listener so an OnDemand can get notice when demand has been requested
185190
/// (a receiver tried to receive but the queue was empty)
186191
fn on_demand_listen(&self) -> EventListener {
@@ -340,7 +345,13 @@ impl<T: Serialize> Sender<T> {
340345
impl<T: Serialize> Clone for Sender<T> {
341346
fn clone(&self) -> Self {
342347
let count = self.channel.increment_sender_count();
343-
info!("Sender::clone: {} new count: {}", self.name(), count);
348+
let on_demand_count = self.channel.on_demand_count();
349+
info!(
350+
"Sender::Clone channel {}, new count: {}, on_demand_count: {}",
351+
self.name(),
352+
count,
353+
on_demand_count
354+
);
344355
Sender {
345356
channel: self.channel.clone(),
346357
listener: None,
@@ -353,8 +364,15 @@ impl<T: Serialize> Clone for Sender<T> {
353364
impl<T: Serialize> Drop for Sender<T> {
354365
fn drop(&mut self) {
355366
let count = self.channel.decrement_sender_count();
356-
info!("Sender::drop: {} new count: {}", self.name(), count);
367+
let on_demand_count = self.channel.on_demand_count();
368+
info!(
369+
"Sender::Drop channel {}, new count: {}, on_demand_count: {}",
370+
self.name(),
371+
count,
372+
on_demand_count
373+
);
357374
if count == 0 {
375+
info!("Sender::Drop channel {}, notify_all_receivers", self.name());
358376
self.channel.notify_all_receivers();
359377
}
360378
}
@@ -431,7 +449,7 @@ impl<T: Serialize> Sink<T> for Sender<T> {
431449
if self.no_receivers() {
432450
self.listener = None;
433451
debug!(
434-
"Sink for Sender:poll_ready {} no_receivers, length: ${}",
452+
"Sink for Sender:poll_ready {} no_receivers, length: {}",
435453
self.name(),
436454
self.len()
437455
);
@@ -531,9 +549,10 @@ pub struct Receiver<T: Serialize> {
531549
impl<T: Serialize> Clone for Receiver<T> {
532550
fn clone(&self) -> Self {
533551
let count = self.channel.increment_receiver_count();
552+
let on_demand_count = self.channel.on_demand_count();
534553
debug!(
535-
"Receiver:Clone cloning channel {}, new count: {}",
536-
self.channel.name, count
554+
"Receiver:Clone channel {}, new count: {}, on_demand_count: {}",
555+
self.channel.name, count, on_demand_count
537556
);
538557
Receiver {
539558
channel: self.channel.clone(),
@@ -547,9 +566,10 @@ impl<T: Serialize> Clone for Receiver<T> {
547566
impl<T: Serialize> Drop for Receiver<T> {
548567
fn drop(&mut self) {
549568
let new_count = self.channel.decrement_receiver_count();
569+
let on_demand_count = self.channel.on_demand_count();
550570
debug!(
551-
"Receiver:Drop channel {}, new count: {}",
552-
self.channel.name, new_count
571+
"Receiver:Drop channel {}, new count: {}, on_demand_count: {}",
572+
self.channel.name, new_count, on_demand_count
553573
);
554574
if new_count == 0 {
555575
// notify all senders so they will see there are no more receivers
@@ -558,6 +578,8 @@ impl<T: Serialize> Drop for Receiver<T> {
558578
"Receiver:Drop channel {}, notify_all_senders",
559579
self.channel.name
560580
);
581+
// When there are no more receivers we need to notify the on_demand in addition to the normal senders
582+
self.channel.notify_all_on_demand();
561583
self.channel.notify_all_senders();
562584
}
563585
}
@@ -570,13 +592,26 @@ impl<T: Serialize> Stream for Receiver<T> {
570592
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
571593
debug!(
572594
"Receiver:poll_next channel {}, length: {}",
573-
self.channel.name, self.channel.name
595+
self.channel.name,
596+
self.channel.len()
574597
);
575598
loop {
576599
if let Some(listener) = self.listener.as_mut() {
577600
match Pin::new(listener).poll(cx) {
578-
Poll::Ready(()) => self.listener = None,
579-
Poll::Pending => return Poll::Pending,
601+
Poll::Ready(()) => {
602+
debug!(
603+
"Receiver:poll_next channel {}, listener Poll::Ready, listener = None",
604+
self.channel.name
605+
);
606+
self.listener = None;
607+
}
608+
Poll::Pending => {
609+
debug!(
610+
"Receiver:poll_next channel {}, listener Poll::Pending",
611+
self.channel.name
612+
);
613+
return Poll::Pending;
614+
}
580615
}
581616
}
582617

@@ -674,6 +709,10 @@ impl<T: Serialize + Send + 'static> Stream for OnDemandReceiver<T> {
674709
self.channel.len()
675710
);
676711
self.listener = None;
712+
debug!(
713+
"OnDemandReceiver::poll_next {} listener: None",
714+
self.channel.name
715+
);
677716
return Poll::Ready(None);
678717
}
679718

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1013,6 +1013,7 @@ fn create_try_run_future(
10131013

10141014
let mut test_ended_rx = BroadcastStream::new(test_ended_tx.subscribe());
10151015
let mut left = try_join_all(endpoint_calls).map(move |r| {
1016+
debug!("create_try_run_future try_join_all finish {:?}", r);
10161017
let _ = test_ended_tx.send(r.map(|_| TestEndReason::Completed));
10171018
});
10181019
let f = future::poll_fn(move |cx| match left.poll_unpin(cx) {

0 commit comments

Comments
 (0)