Skip to content

Commit 9fd9990

Browse files
authored
Pewpew try hang (#177) (#178)
* Simplified the provider names in example * Fixed the try hang for both provider_chain and provider spread - provider_loop still has issues with try scripts * Added example with provider collect * Added additional logging to find try script issues with provider_loop example * Fixed cargo fmt
1 parent d914a5e commit 9fd9990

8 files changed

+238
-92
lines changed

examples/provider_collect.yaml

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# Run this file with the test-server, then specify the PORT as a parameter to this try or run script.
2+
3+
vars:
4+
port: ${e:PORT}
5+
load_pattern:
6+
- !linear
7+
from: 100%
8+
to: 100%
9+
over: 5s
10+
11+
providers:
12+
a: !range
13+
b: !response
14+
buffer: 10
15+
c: !response
16+
d: !response
17+
e: !response
18+
f: !response
19+
20+
loggers:
21+
test:
22+
to: stderr
23+
24+
endpoints:
25+
- method: POST
26+
url: http://localhost:${v:port}
27+
body: !str '{"a": ${p:a}}'
28+
provides:
29+
b:
30+
query:
31+
select: response.body.a
32+
send: block
33+
# on_demand: true
34+
35+
- method: POST
36+
declare:
37+
c: !c
38+
collects:
39+
- take: 10
40+
from: ${p:b}
41+
as: _c
42+
then: ${p:_c}
43+
url: http://localhost:${v:port}
44+
body: !str '{"c": ${p:c}}'
45+
# peak_load: 1hps
46+
provides:
47+
d:
48+
query:
49+
# select: response.body.c
50+
select: for_each[0]
51+
for_each:
52+
- response.body.c
53+
send: block
54+
on_demand: true
55+
56+
- method: POST
57+
url: http://localhost:${v:port}
58+
body: !str '{"d": ${p:d}}'
59+
peak_load: 1hps
60+
logs:
61+
test:
62+
select: response.body.d

examples/provider_loop_with_counter.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ providers:
1414
a: !range
1515
b: !response
1616
c: !response
17-
d: !response
1817

1918
loggers:
2019
test:
@@ -41,8 +40,9 @@ endpoints:
4140

4241
- method: POST
4342
url: http://localhost:${v:port}
44-
body: !str '{"b": ${x:${p:b}.value}}'
43+
body: !str '{"b": ${x:${p:b}.value}, "counter": ${x:${p:b}.counter}}'
4544
peak_load: 5hps
45+
# on_demand: true # We can't do on_demand due to a bug where it can't figure out that we provider for ourselves
4646
provides:
4747
b:
4848
query:

examples/provider_spread.yaml

+33-38
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
vars:
44
port: "${e:PORT}"
5-
createName: 'test:loadtest-'
65
groupRate: 1
76
imagesPerGroup: 10
87

@@ -13,20 +12,18 @@ load_pattern:
1312
over: 5s
1413

1514
providers:
16-
group_range: # Counter for creating groups
15+
a: # Counter for creating groups
1716
!range
18-
image_range: # counter for creating images to put in groups
17+
x: # counter for creating images to put in groups
1918
!range
20-
group_created: # to continue the group APIs
19+
b: # to continue the group APIs
2120
!response
22-
group_created_for_images: # to create images
21+
b2: # to create images
2322
!response
24-
image_created: # to continue the image APIs
23+
y: # to continue the image APIs
2524
!response
26-
group_create_data: !response
27-
image_create_data: !response
28-
group_update_data: !response
29-
image_update_data: !response
25+
c: !response
26+
z: !response
3027

3128
loggers:
3229
test:
@@ -38,18 +35,17 @@ endpoints:
3835
tags:
3936
type: create group
4037
body: !str '{
41-
"id":"${v:createName}${x:start_pad(${p:group_range}, 6, "0")}",
42-
"name":"TEST-GROUP"
38+
"a":"${x:start_pad(${p:a}, 6, "0")}"
4339
}'
4440
provides:
45-
group_created:
41+
b:
4642
query:
47-
select: response.body.id
43+
select: response.body.a
4844
where: response.status == 200 || response.status == 409
4945
send: if_not_full
50-
group_created_for_images:
46+
b2:
5147
query:
52-
select: response.body.id
48+
select: response.body.a
5349
for_each:
5450
- repeat(_v.imagesPerGroup) # We need to create X copies so each image will have one
5551
where: response.status == 200 || response.status == 409
@@ -61,16 +57,15 @@ endpoints:
6157
tags:
6258
type: create image
6359
body: !str '{
64-
"id":"${v:createName}${x:start_pad(${p:image_range}, 8, "0")}",
65-
"groupId":"${p:group_created_for_images}",
66-
"name":"TEST-IMAGE"
60+
"x":"${x:start_pad(${p:x}, 8, "0")}",
61+
"b":"${p:b2}"
6762
}'
6863
provides:
69-
image_created:
64+
y:
7065
query:
7166
select:
72-
id: response.body.id
73-
groupId: response.body.groupId
67+
x: response.body.x
68+
b: response.body.b
7469
where: response.status == 200 || response.status == 409
7570
send: if_not_full
7671
peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups
@@ -80,15 +75,15 @@ endpoints:
8075
tags:
8176
type: create group data
8277
body: !str '{
83-
"id":"${p:group_created}",
78+
"b":"${p:b}",
8479
"data":{
85-
"subdata":"TEST-DATA"
80+
"subdata":"A-DATA"
8681
}
8782
}'
8883
provides:
89-
group_create_data:
84+
c:
9085
query:
91-
select: group_created
86+
select: response.body.b
9287
where: response.status == 200
9388
send: if_not_full
9489
peak_load: ${v:groupRate}hps
@@ -98,43 +93,43 @@ endpoints:
9893
tags:
9994
type: create image
10095
body: !str '{
101-
"id":"${x:${p:image_created}.id}",
102-
"groupId":"${x:${p:image_created}.groupId}",
96+
"x":"${x:${p:y}.x}",
97+
"b":"${x:${p:y}.b}",
10398
"data":{
104-
"subdata":"TEST-DATA"
99+
"subdata":"X-DATA"
105100
}
106101
}'
107102
provides:
108-
image_create_data:
103+
z:
109104
query:
110-
select: image_created # Puts in the whole object (id and groupId)
105+
select: y # Puts in the whole object (id and groupId)
111106
where: response.status == 200
112107
send: if_not_full
113108
peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups
114109

115110
- method: PUT
116111
url: http://localhost:${v:port}
117112
body: !str '{
118-
"id":"${p:group_create_data}",
113+
"c":"${p:c}",
119114
"data":{
120-
"subdata":"UPDATED-TEST-DATA"
115+
"subdata":"UPDATED-A-DATA"
121116
}
122117
}'
123118
peak_load: ${v:groupRate}hps
124119
logs:
125120
test:
126-
select: response.body.id
121+
select: response.body.c
127122

128123
- method: PUT
129124
url: http://localhost:${v:port}
130125
body: !str '{
131-
"id":"${x:${p:image_create_data}.id}",
132-
"groupId":"${x:${p:image_create_data}.groupId}",
126+
"x":"${x:${p:z}.x}",
127+
"b":"${x:${p:z}.b}",
133128
"data":{
134-
"subdata":"UPDATED-TEST-DATA"
129+
"subdata":"UPDATED-X-DATA"
135130
}
136131
}'
137132
peak_load: ${x:${v:groupRate} * ${v:imagesPerGroup}}hps # Needs to be a higher rate to keep up with groups
138133
logs:
139134
test:
140-
select: response.body.id
135+
select: response.body.x

lib/channel/src/lib.rs

+47-9
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,11 @@ impl<T: Serialize> Channel<T> {
181181
self.on_demand_events.notify(1);
182182
}
183183

184+
/// notify all OnDemand with an event listener
185+
fn notify_all_on_demand(&self) {
186+
self.on_demand_events.notify(std::usize::MAX);
187+
}
188+
184189
/// create a listener so an OnDemand can get notice when demand has been requested
185190
/// (a receiver tried to receive but the queue was empty)
186191
fn on_demand_listen(&self) -> EventListener {
@@ -340,7 +345,13 @@ impl<T: Serialize> Sender<T> {
340345
impl<T: Serialize> Clone for Sender<T> {
341346
fn clone(&self) -> Self {
342347
let count = self.channel.increment_sender_count();
343-
info!("Sender::clone: {} new count: {}", self.name(), count);
348+
let on_demand_count = self.channel.on_demand_count();
349+
info!(
350+
"Sender::Clone channel {}, new count: {}, on_demand_count: {}",
351+
self.name(),
352+
count,
353+
on_demand_count
354+
);
344355
Sender {
345356
channel: self.channel.clone(),
346357
listener: None,
@@ -353,8 +364,15 @@ impl<T: Serialize> Clone for Sender<T> {
353364
impl<T: Serialize> Drop for Sender<T> {
354365
fn drop(&mut self) {
355366
let count = self.channel.decrement_sender_count();
356-
info!("Sender::drop: {} new count: {}", self.name(), count);
367+
let on_demand_count = self.channel.on_demand_count();
368+
info!(
369+
"Sender::Drop channel {}, new count: {}, on_demand_count: {}",
370+
self.name(),
371+
count,
372+
on_demand_count
373+
);
357374
if count == 0 {
375+
info!("Sender::Drop channel {}, notify_all_receivers", self.name());
358376
self.channel.notify_all_receivers();
359377
}
360378
}
@@ -431,7 +449,7 @@ impl<T: Serialize> Sink<T> for Sender<T> {
431449
if self.no_receivers() {
432450
self.listener = None;
433451
debug!(
434-
"Sink for Sender:poll_ready {} no_receivers, length: ${}",
452+
"Sink for Sender:poll_ready {} no_receivers, length: {}",
435453
self.name(),
436454
self.len()
437455
);
@@ -531,9 +549,10 @@ pub struct Receiver<T: Serialize> {
531549
impl<T: Serialize> Clone for Receiver<T> {
532550
fn clone(&self) -> Self {
533551
let count = self.channel.increment_receiver_count();
552+
let on_demand_count = self.channel.on_demand_count();
534553
debug!(
535-
"Receiver:Clone cloning channel {}, new count: {}",
536-
self.channel.name, count
554+
"Receiver:Clone channel {}, new count: {}, on_demand_count: {}",
555+
self.channel.name, count, on_demand_count
537556
);
538557
Receiver {
539558
channel: self.channel.clone(),
@@ -547,9 +566,10 @@ impl<T: Serialize> Clone for Receiver<T> {
547566
impl<T: Serialize> Drop for Receiver<T> {
548567
fn drop(&mut self) {
549568
let new_count = self.channel.decrement_receiver_count();
569+
let on_demand_count = self.channel.on_demand_count();
550570
debug!(
551-
"Receiver:Drop channel {}, new count: {}",
552-
self.channel.name, new_count
571+
"Receiver:Drop channel {}, new count: {}, on_demand_count: {}",
572+
self.channel.name, new_count, on_demand_count
553573
);
554574
if new_count == 0 {
555575
// notify all senders so they will see there are no more receivers
@@ -558,6 +578,8 @@ impl<T: Serialize> Drop for Receiver<T> {
558578
"Receiver:Drop channel {}, notify_all_senders",
559579
self.channel.name
560580
);
581+
// When there are no more receivers we need to notify the on_demand in addition to the normal senders
582+
self.channel.notify_all_on_demand();
561583
self.channel.notify_all_senders();
562584
}
563585
}
@@ -576,8 +598,20 @@ impl<T: Serialize> Stream for Receiver<T> {
576598
loop {
577599
if let Some(listener) = self.listener.as_mut() {
578600
match Pin::new(listener).poll(cx) {
579-
Poll::Ready(()) => self.listener = None,
580-
Poll::Pending => return Poll::Pending,
601+
Poll::Ready(()) => {
602+
debug!(
603+
"Receiver:poll_next channel {}, listener Poll::Ready, listener = None",
604+
self.channel.name
605+
);
606+
self.listener = None;
607+
}
608+
Poll::Pending => {
609+
debug!(
610+
"Receiver:poll_next channel {}, listener Poll::Pending",
611+
self.channel.name
612+
);
613+
return Poll::Pending;
614+
}
581615
}
582616
}
583617

@@ -675,6 +709,10 @@ impl<T: Serialize + Send + 'static> Stream for OnDemandReceiver<T> {
675709
self.channel.len()
676710
);
677711
self.listener = None;
712+
debug!(
713+
"OnDemandReceiver::poll_next {} listener: None",
714+
self.channel.name
715+
);
678716
return Poll::Ready(None);
679717
}
680718

0 commit comments

Comments
 (0)