Skip to content

Commit 2827009

Browse files
committed
experiments
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
1 parent 8144442 commit 2827009

14 files changed

+493
-34
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ waker-fn = "^1.1"
7575

7676
[dev-dependencies]
7777
async-global-executor = "^2.0"
78+
async-io = "^2.0"
7879
futures-lite = "^2.0"
7980
serde_json = "^1.0"
8081
waker-fn = "^1.1"

examples/c.rs

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use futures_lite::StreamExt;
2+
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
3+
use tracing::info;
4+
5+
fn main() {
6+
if std::env::var("RUST_LOG").is_err() {
7+
unsafe { std::env::set_var("RUST_LOG", "info") };
8+
}
9+
10+
tracing_subscriber::fmt::init();
11+
12+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
13+
14+
async_global_executor::block_on(async {
15+
let conn = Connection::connect(&addr, ConnectionProperties::default())
16+
.await
17+
.expect("connection error");
18+
19+
info!("CONNECTED");
20+
21+
//receive channel
22+
let channel = conn.create_channel().await.expect("create_channel");
23+
info!(state=?conn.status().state());
24+
25+
let queue = channel
26+
.queue_declare(
27+
"hello-recover",
28+
QueueDeclareOptions::default(),
29+
FieldTable::default(),
30+
)
31+
.await
32+
.expect("queue_declare");
33+
info!(state=?conn.status().state());
34+
info!(?queue, "Declared queue");
35+
36+
info!("will consume");
37+
let mut consumer = channel
38+
.basic_consume(
39+
"hello-recover",
40+
"my_consumer",
41+
BasicConsumeOptions::default(),
42+
FieldTable::default(),
43+
)
44+
.await
45+
.expect("basic_consume");
46+
info!(state=?conn.status().state());
47+
48+
while let Some(delivery) = consumer.next().await {
49+
info!(message=?delivery, "received message");
50+
if let Ok(delivery) = delivery {
51+
delivery
52+
.ack(BasicAckOptions::default())
53+
.await
54+
.expect("basic_ack");
55+
}
56+
}
57+
})
58+
}

examples/p.rs

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
2+
use tracing::info;
3+
4+
fn main() {
5+
if std::env::var("RUST_LOG").is_err() {
6+
std::env::set_var("RUST_LOG", "info");
7+
}
8+
9+
tracing_subscriber::fmt::init();
10+
11+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
12+
let recovery_config = lapin::experimental::RecoveryConfig::default().auto_recover_channels();
13+
14+
async_global_executor::block_on(async {
15+
let conn = Connection::connect(
16+
&addr,
17+
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
18+
)
19+
.await
20+
.expect("connection error");
21+
22+
info!("CONNECTED");
23+
24+
let channel1 = conn.create_channel().await.expect("create_channel");
25+
channel1
26+
.confirm_select(ConfirmSelectOptions::default())
27+
.await
28+
.expect("confirm_select");
29+
channel1
30+
.queue_declare(
31+
"hello-recover",
32+
QueueDeclareOptions::default(),
33+
FieldTable::default(),
34+
)
35+
.await
36+
.expect("queue_declare");
37+
38+
let ch = channel1.clone();
39+
async_global_executor::spawn(async move {
40+
loop {
41+
async_io::Timer::after(std::time::Duration::from_secs(1)).await;
42+
info!("Trigger failure");
43+
assert!(ch
44+
.queue_declare(
45+
"fake queue",
46+
QueueDeclareOptions {
47+
passive: true,
48+
..QueueDeclareOptions::default()
49+
},
50+
FieldTable::default(),
51+
)
52+
.await
53+
.is_err());
54+
}
55+
})
56+
.detach();
57+
58+
let mut published = 0;
59+
let mut errors = 0;
60+
info!("will publish");
61+
loop {
62+
let res = channel1
63+
.basic_publish(
64+
"",
65+
"recover-test",
66+
BasicPublishOptions::default(),
67+
b"before",
68+
BasicProperties::default(),
69+
)
70+
.await;
71+
let res = if let Ok(res) = res {
72+
res.await.map(|_| ())
73+
} else {
74+
res.map(|_| ())
75+
};
76+
match res {
77+
Ok(()) => {
78+
println!("GOT OK");
79+
published += 1;
80+
}
81+
Err(err) => {
82+
println!("GOT ERROR");
83+
let (soft, notifier) = err.is_amqp_soft_error();
84+
if !soft {
85+
panic!("{}", err);
86+
}
87+
errors += 1;
88+
if let Some(notifier) = notifier {
89+
notifier.await
90+
}
91+
}
92+
}
93+
println!("Published {} with {} errors", published, errors);
94+
}
95+
});
96+
}

examples/t.rs

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use lapin::{
2+
message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable,
3+
BasicProperties, Connection, ConnectionProperties,
4+
};
5+
use tracing::info;
6+
7+
fn main() {
8+
if std::env::var("RUST_LOG").is_err() {
9+
std::env::set_var("RUST_LOG", "info");
10+
}
11+
12+
tracing_subscriber::fmt::init();
13+
14+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
15+
let recovery_config = lapin::experimental::RecoveryConfig::default().auto_recover_channels();
16+
17+
async_global_executor::block_on(async {
18+
let conn = Connection::connect(
19+
&addr,
20+
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
21+
)
22+
.await
23+
.expect("connection error");
24+
25+
info!("CONNECTED");
26+
27+
{
28+
let channel1 = conn.create_channel().await.expect("create_channel");
29+
let channel2 = conn.create_channel().await.expect("create_channel");
30+
channel1
31+
.confirm_select(ConfirmSelectOptions::default())
32+
.await
33+
.expect("confirm_select");
34+
channel1
35+
.queue_declare(
36+
"recover-test",
37+
QueueDeclareOptions::default(),
38+
FieldTable::default(),
39+
)
40+
.await
41+
.expect("queue_declare");
42+
43+
info!("will consume");
44+
let channel = channel2.clone();
45+
channel2
46+
.basic_consume(
47+
"recover-test",
48+
"my_consumer",
49+
BasicConsumeOptions::default(),
50+
FieldTable::default(),
51+
)
52+
.await
53+
.expect("basic_consume")
54+
.set_delegate(move |delivery: DeliveryResult| {
55+
let channel = channel.clone();
56+
async move {
57+
info!(message=?delivery, "received message");
58+
if let Ok(Some(delivery)) = delivery {
59+
delivery
60+
.ack(BasicAckOptions::default())
61+
.await
62+
.expect("basic_ack");
63+
if &delivery.data[..] == b"after" {
64+
channel
65+
.basic_cancel("my_consumer", BasicCancelOptions::default())
66+
.await
67+
.expect("basic_cancel");
68+
}
69+
}
70+
}
71+
});
72+
73+
info!("will publish");
74+
let confirm = channel1
75+
.basic_publish(
76+
"",
77+
"recover-test",
78+
BasicPublishOptions::default(),
79+
b"before",
80+
BasicProperties::default(),
81+
)
82+
.await
83+
.expect("basic_publish")
84+
.await
85+
.expect("publisher-confirms");
86+
assert_eq!(confirm, Confirmation::Ack(None));
87+
88+
info!("before fail");
89+
assert!(channel1
90+
.queue_declare(
91+
"fake queue",
92+
QueueDeclareOptions {
93+
passive: true,
94+
..QueueDeclareOptions::default()
95+
},
96+
FieldTable::default(),
97+
)
98+
.await
99+
.is_err());
100+
info!("after fail");
101+
102+
info!("publish after");
103+
let confirm = channel1
104+
.basic_publish(
105+
"",
106+
"recover-test",
107+
BasicPublishOptions::default(),
108+
b"after",
109+
BasicProperties::default(),
110+
)
111+
.await
112+
.expect("basic_publish")
113+
.await
114+
.expect("publisher-confirms");
115+
assert_eq!(confirm, Confirmation::Ack(None));
116+
}
117+
118+
conn.run().expect("conn.run");
119+
});
120+
}

src/acknowledgement.rs

+9
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ impl Acknowledgements {
6262
pub(crate) fn on_channel_error(&self, error: Error) {
6363
self.0.lock().on_channel_error(error);
6464
}
65+
66+
pub(crate) fn reset(&self, error: Error) {
67+
self.0.lock().reset(error);
68+
}
6569
}
6670

6771
impl fmt::Debug for Acknowledgements {
@@ -174,4 +178,9 @@ impl Inner {
174178
}
175179
}
176180
}
181+
182+
fn reset(&mut self, error: Error) {
183+
self.delivery_tag = IdSequence::new(false);
184+
self.on_channel_error(error);
185+
}
177186
}

0 commit comments

Comments
 (0)