Skip to content

Commit 187352f

Browse files
committed
experiments
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
1 parent 2b187af commit 187352f

14 files changed

+492
-34
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ waker-fn = "^1.1"
7575

7676
[dev-dependencies]
7777
async-global-executor = "^2.0"
78+
async-io = "^2.0"
7879
futures-lite = "^2.0"
7980
serde_json = "^1.0"
8081
waker-fn = "^1.1"

examples/c.rs

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use futures_lite::StreamExt;
2+
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
3+
use tracing::info;
4+
5+
fn main() {
6+
if std::env::var("RUST_LOG").is_err() {
7+
unsafe { std::env::set_var("RUST_LOG", "info") };
8+
}
9+
10+
tracing_subscriber::fmt::init();
11+
12+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
13+
14+
async_global_executor::block_on(async {
15+
let conn = Connection::connect(&addr, ConnectionProperties::default())
16+
.await
17+
.expect("connection error");
18+
19+
info!("CONNECTED");
20+
21+
//receive channel
22+
let channel = conn.create_channel().await.expect("create_channel");
23+
info!(state=?conn.status().state());
24+
25+
let queue = channel
26+
.queue_declare(
27+
"hello-recover",
28+
QueueDeclareOptions::default(),
29+
FieldTable::default(),
30+
)
31+
.await
32+
.expect("queue_declare");
33+
info!(state=?conn.status().state());
34+
info!(?queue, "Declared queue");
35+
36+
info!("will consume");
37+
let mut consumer = channel
38+
.basic_consume(
39+
"hello-recover",
40+
"my_consumer",
41+
BasicConsumeOptions::default(),
42+
FieldTable::default(),
43+
)
44+
.await
45+
.expect("basic_consume");
46+
info!(state=?conn.status().state());
47+
48+
while let Some(delivery) = consumer.next().await {
49+
info!(message=?delivery, "received message");
50+
if let Ok(delivery) = delivery {
51+
delivery
52+
.ack(BasicAckOptions::default())
53+
.await
54+
.expect("basic_ack");
55+
}
56+
}
57+
})
58+
}

examples/p.rs

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use lapin::{options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties};
2+
use tracing::info;
3+
4+
fn main() {
5+
if std::env::var("RUST_LOG").is_err() {
6+
std::env::set_var("RUST_LOG", "info");
7+
}
8+
9+
tracing_subscriber::fmt::init();
10+
11+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
12+
let recovery_config = lapin::experimental::RecoveryConfig::default().auto_recover_channels();
13+
14+
async_global_executor::block_on(async {
15+
let conn = Connection::connect(
16+
&addr,
17+
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
18+
)
19+
.await
20+
.expect("connection error");
21+
22+
info!("CONNECTED");
23+
24+
let channel1 = conn.create_channel().await.expect("create_channel");
25+
channel1
26+
.confirm_select(ConfirmSelectOptions::default())
27+
.await
28+
.expect("confirm_select");
29+
channel1
30+
.queue_declare(
31+
"hello-recover",
32+
QueueDeclareOptions::default(),
33+
FieldTable::default(),
34+
)
35+
.await
36+
.expect("queue_declare");
37+
38+
let ch = channel1.clone();
39+
async_global_executor::spawn(async move {
40+
loop {
41+
async_io::Timer::after(std::time::Duration::from_secs(1)).await;
42+
info!("Trigger failure");
43+
assert!(ch
44+
.queue_declare(
45+
"fake queue",
46+
QueueDeclareOptions {
47+
passive: true,
48+
..QueueDeclareOptions::default()
49+
},
50+
FieldTable::default(),
51+
)
52+
.await
53+
.is_err());
54+
}
55+
})
56+
.detach();
57+
58+
let mut published = 0;
59+
let mut errors = 0;
60+
info!("will publish");
61+
loop {
62+
let res = channel1
63+
.basic_publish(
64+
"",
65+
"recover-test",
66+
BasicPublishOptions::default(),
67+
b"before",
68+
BasicProperties::default(),
69+
)
70+
.await;
71+
let res = if let Ok(res) = res {
72+
res.await.map(|_| ())
73+
} else {
74+
res.map(|_| ())
75+
};
76+
match res {
77+
Ok(()) => {
78+
println!("GOT OK");
79+
published += 1;
80+
}
81+
Err(err) => {
82+
println!("GOT ERROR");
83+
if !err.is_amqp_soft_error() {
84+
panic!("{}", err);
85+
}
86+
errors += 1;
87+
if let Some(notifier) = err.notifier() {
88+
notifier.await
89+
}
90+
}
91+
}
92+
println!("Published {} with {} errors", published, errors);
93+
}
94+
});
95+
}

examples/t.rs

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use lapin::{
2+
message::DeliveryResult, options::*, publisher_confirm::Confirmation, types::FieldTable,
3+
BasicProperties, Connection, ConnectionProperties,
4+
};
5+
use tracing::info;
6+
7+
fn main() {
8+
if std::env::var("RUST_LOG").is_err() {
9+
std::env::set_var("RUST_LOG", "info");
10+
}
11+
12+
tracing_subscriber::fmt::init();
13+
14+
let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
15+
let recovery_config = lapin::experimental::RecoveryConfig::default().auto_recover_channels();
16+
17+
async_global_executor::block_on(async {
18+
let conn = Connection::connect(
19+
&addr,
20+
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
21+
)
22+
.await
23+
.expect("connection error");
24+
25+
info!("CONNECTED");
26+
27+
{
28+
let channel1 = conn.create_channel().await.expect("create_channel");
29+
let channel2 = conn.create_channel().await.expect("create_channel");
30+
channel1
31+
.confirm_select(ConfirmSelectOptions::default())
32+
.await
33+
.expect("confirm_select");
34+
channel1
35+
.queue_declare(
36+
"recover-test",
37+
QueueDeclareOptions::default(),
38+
FieldTable::default(),
39+
)
40+
.await
41+
.expect("queue_declare");
42+
43+
info!("will consume");
44+
let channel = channel2.clone();
45+
channel2
46+
.basic_consume(
47+
"recover-test",
48+
"my_consumer",
49+
BasicConsumeOptions::default(),
50+
FieldTable::default(),
51+
)
52+
.await
53+
.expect("basic_consume")
54+
.set_delegate(move |delivery: DeliveryResult| {
55+
let channel = channel.clone();
56+
async move {
57+
info!(message=?delivery, "received message");
58+
if let Ok(Some(delivery)) = delivery {
59+
delivery
60+
.ack(BasicAckOptions::default())
61+
.await
62+
.expect("basic_ack");
63+
if &delivery.data[..] == b"after" {
64+
channel
65+
.basic_cancel("my_consumer", BasicCancelOptions::default())
66+
.await
67+
.expect("basic_cancel");
68+
}
69+
}
70+
}
71+
});
72+
73+
info!("will publish");
74+
let confirm = channel1
75+
.basic_publish(
76+
"",
77+
"recover-test",
78+
BasicPublishOptions::default(),
79+
b"before",
80+
BasicProperties::default(),
81+
)
82+
.await
83+
.expect("basic_publish")
84+
.await
85+
.expect("publisher-confirms");
86+
assert_eq!(confirm, Confirmation::Ack(None));
87+
88+
info!("before fail");
89+
assert!(channel1
90+
.queue_declare(
91+
"fake queue",
92+
QueueDeclareOptions {
93+
passive: true,
94+
..QueueDeclareOptions::default()
95+
},
96+
FieldTable::default(),
97+
)
98+
.await
99+
.is_err());
100+
info!("after fail");
101+
102+
info!("publish after");
103+
let confirm = channel1
104+
.basic_publish(
105+
"",
106+
"recover-test",
107+
BasicPublishOptions::default(),
108+
b"after",
109+
BasicProperties::default(),
110+
)
111+
.await
112+
.expect("basic_publish")
113+
.await
114+
.expect("publisher-confirms");
115+
assert_eq!(confirm, Confirmation::Ack(None));
116+
}
117+
118+
conn.run().expect("conn.run");
119+
});
120+
}

src/acknowledgement.rs

+9
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ impl Acknowledgements {
6262
pub(crate) fn on_channel_error(&self, error: Error) {
6363
self.0.lock().on_channel_error(error);
6464
}
65+
66+
pub(crate) fn reset(&self, error: Error) {
67+
self.0.lock().reset(error);
68+
}
6569
}
6670

6771
impl fmt::Debug for Acknowledgements {
@@ -174,4 +178,9 @@ impl Inner {
174178
}
175179
}
176180
}
181+
182+
fn reset(&mut self, error: Error) {
183+
self.delivery_tag = IdSequence::new(false);
184+
self.on_channel_error(error);
185+
}
177186
}

src/channel.rs

+54-18
Original file line numberDiff line numberDiff line change
@@ -572,13 +572,17 @@ impl Channel {
572572
}
573573

574574
fn on_channel_close_ok_sent(&self, error: Option<Error>) {
575-
self.set_closed(
576-
error
577-
.clone()
578-
.unwrap_or(ErrorKind::InvalidChannelState(ChannelState::Closing).into()),
579-
);
580-
if let Some(error) = error {
581-
self.error_handler.on_error(error);
575+
if !self.recovery_config.auto_recover_channels
576+
|| !error.as_ref().map_or(false, |e| e.is_amqp_soft_error())
577+
{
578+
self.set_closed(
579+
error
580+
.clone()
581+
.unwrap_or(ErrorKind::InvalidChannelState(ChannelState::Closing).into()),
582+
);
583+
if let Some(error) = error {
584+
self.error_handler.on_error(error);
585+
}
582586
}
583587
}
584588

@@ -862,7 +866,19 @@ impl Channel {
862866
resolver: PromiseResolver<Channel>,
863867
channel: Channel,
864868
) -> Result<()> {
865-
self.set_state(ChannelState::Connected);
869+
if self.recovery_config.auto_recover_channels {
870+
self.status.update_recovery_context(|ctx| {
871+
ctx.set_expected_replies(self.frames.take_expected_replies(self.id));
872+
self.frames.drop_frames_for_channel(channel.id, ctx.cause());
873+
self.acknowledgements.reset(ctx.cause());
874+
self.consumers.error(ctx.cause());
875+
});
876+
if !self.status.confirm() {
877+
self.status.finalize_recovery();
878+
}
879+
} else {
880+
self.set_state(ChannelState::Connected);
881+
}
866882
resolver.resolve(channel);
867883
Ok(())
868884
}
@@ -891,18 +907,38 @@ impl Channel {
891907
}
892908

893909
fn on_channel_close_received(&self, method: protocol::channel::Close) -> Result<()> {
894-
let error = AMQPError::try_from(method.clone()).map(|error| {
895-
error!(
896-
channel=%self.id, ?method, ?error,
897-
"Channel closed"
898-
);
899-
ErrorKind::ProtocolError(error).into()
900-
});
901-
self.set_closing(error.clone().ok());
910+
let error: std::result::Result<Error, _> =
911+
AMQPError::try_from(method.clone()).map(|error| {
912+
error!(
913+
channel=%self.id, ?method, ?error,
914+
"Channel closed"
915+
);
916+
ErrorKind::ProtocolError(error).into()
917+
});
918+
match (
919+
self.recovery_config.auto_recover_channels,
920+
error.clone().ok(),
921+
) {
922+
(true, Some(error)) if error.is_amqp_soft_error() => {
923+
self.status.set_reconnecting(error)
924+
}
925+
(_, err) => self.set_closing(err),
926+
}
902927
let error = error.map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok();
903928
let channel = self.clone();
904-
self.internal_rpc
905-
.register_internal_future(async move { channel.channel_close_ok(error).await });
929+
self.internal_rpc.register_internal_future(async move {
930+
channel.channel_close_ok(error).await?;
931+
if channel.recovery_config.auto_recover_channels {
932+
let ch = channel.clone();
933+
channel.channel_open(ch).await?;
934+
if channel.status.confirm() {
935+
channel
936+
.confirm_select(ConfirmSelectOptions::default())
937+
.await?;
938+
}
939+
}
940+
Ok(())
941+
});
906942
Ok(())
907943
}
908944

0 commit comments

Comments
 (0)