Skip to content

Commit 09d28de

Browse files
authored
change ci cd pipeline to let it pass through nuclei (#248)
1 parent d594edd commit 09d28de

File tree

10 files changed

+178
-91
lines changed

10 files changed

+178
-91
lines changed

.github/workflows/ci.yml

+7-8
Original file line numberDiff line numberDiff line change
@@ -59,29 +59,28 @@ jobs:
5959
uses: actions-rs/cargo@v1
6060
with:
6161
command: check
62-
args: --all --bins --examples --tests
63-
64-
- name: tests windows
65-
if: matrix.os != 'windows-latest'
66-
uses: actions-rs/cargo@v1
67-
with:
68-
command: test
69-
args: --all
62+
args: --bins --examples --tests
7063

7164
- name: tests windows
65+
env:
66+
RUST_BACKTRACE: full
7267
if: matrix.os == 'windows-latest'
7368
uses: actions-rs/cargo@v1
7469
with:
7570
command: test
7671

7772
- name: tests nightly osx
73+
env:
74+
RUST_BACKTRACE: full
7875
if: ${{ matrix.version == 'nightly' && matrix.os == 'macOS-latest' }}
7976
uses: actions-rs/cargo@v1
8077
with:
8178
command: test
8279
args: --all --all-features
8380

8481
- name: tests nightly linux
82+
env:
83+
RUST_BACKTRACE: full
8584
if: ${{ matrix.version == 'nightly' && matrix.os == 'ubuntu-latest' }}
8685
uses: actions-rs/cargo@v1
8786
with:

src/bastion/Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@ docs = ["distributed", "scaling", "default"]
4949
features = ["docs"]
5050
rustdoc-args = ["--cfg", "feature=\"docs\""]
5151

52-
5352
[dependencies]
54-
nuclei = "0.1.1"
5553
bastion-executor = "0.3.5"
5654
lightproc = "0.3.5"
5755
# bastion-executor = { version = "= 0.3.5-alpha", path = "../bastion-executor" }
@@ -77,6 +75,9 @@ tracing-subscriber = "0.2.6"
7775
tracing = "0.1.15"
7876
anyhow = "1.0.31"
7977

78+
[target.'cfg(not(windows))'.dependencies]
79+
nuclei = "0.1.1"
80+
8081
[dev-dependencies]
8182
env_logger = "0.7"
8283
proptest = "0.10"

src/bastion/examples/distributed-fwrite.rs

+9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use bastion::prelude::*;
22
use futures::*;
33
use std::fs::{File, OpenOptions};
4+
#[cfg(target_os = "windows")]
5+
use std::io::Write;
46
use std::path::PathBuf;
57
use std::sync::Arc;
68

@@ -63,7 +65,11 @@ fn main() {
6365
.write(true)
6466
.open(&path)
6567
.unwrap();
68+
69+
#[cfg(not(target_os = "windows"))]
6670
let mut file = Handle::<File>::new(fo).unwrap();
71+
#[cfg(target_os = "windows")]
72+
let mut file = fo;
6773

6874
// Distribute your workload to workers
6975
for id_worker_pair in workers.elems().iter().enumerate() {
@@ -74,6 +80,9 @@ fn main() {
7480
msg: u64 => {
7581
// Handle the answer...
7682
println!("Source received the computed value: {}", msg);
83+
#[cfg(target_os = "windows")]
84+
file.write_all(msg.to_string().as_bytes()).unwrap();
85+
#[cfg(not(target_os = "windows"))]
7786
file.write_all(msg.to_string().as_bytes()).await.unwrap();
7887
};
7988
_: _ => ();

src/bastion/examples/scaling_groups.rs

+37-26
Original file line numberDiff line numberDiff line change
@@ -35,44 +35,55 @@ fn auto_resize_group_supervisor(supervisor: Supervisor) -> Supervisor {
3535
}
3636

3737
fn input_group(children: Children) -> Children {
38-
children
39-
.with_redundancy(1)
40-
.with_resizer(OptimalSizeExploringResizer::default().with_lower_bound(0)) // Don't start new actors after finishing execution
41-
.with_exec(move |ctx: BastionContext| async move {
42-
println!("[Input] Worker started!");
43-
44-
let mut messages_sent = 0;
45-
static INPUT: [u64; 5] = [5u64, 1, 2, 4, 3];
46-
let group_name = "Processing".to_string();
47-
let target = BroadcastTarget::Group(group_name);
48-
49-
while messages_sent != 1000 {
50-
// Emulate the workload. The number means how
51-
// long it must wait before processing.
52-
for value in INPUT.iter() {
53-
ctx.broadcast_message(target.clone(), value);
54-
Delay::new(Duration::from_millis(75 * value)).await;
55-
}
56-
57-
messages_sent += INPUT.len();
38+
// we would have fully chained the children builder if it wasn't for the feature flag
39+
let mut children = children.with_redundancy(1);
40+
#[cfg(feature = "scaling")]
41+
{
42+
// Don't start new actors after finishing execution
43+
children =
44+
children.with_resizer(OptimalSizeExploringResizer::default().with_lower_bound(0));
45+
}
46+
children.with_exec(move |ctx: BastionContext| async move {
47+
println!("[Input] Worker started!");
48+
49+
let mut messages_sent = 0;
50+
static INPUT: [u64; 5] = [5u64, 1, 2, 4, 3];
51+
let group_name = "Processing".to_string();
52+
let target = BroadcastTarget::Group(group_name);
53+
54+
while messages_sent != 1000 {
55+
// Emulate the workload. The number means how
56+
// long it must wait before processing.
57+
for value in INPUT.iter() {
58+
ctx.broadcast_message(target.clone(), value);
59+
Delay::new(Duration::from_millis(75 * value)).await;
5860
}
5961

60-
Ok(())
61-
})
62+
messages_sent += INPUT.len();
63+
}
64+
65+
Ok(())
66+
})
6267
}
6368

6469
fn auto_resize_group(children: Children) -> Children {
65-
children
70+
// we would have fully chained the children builder if it wasn't for the feature flag
71+
let mut children = children
6672
.with_redundancy(3) // Start with 3 actors
67-
.with_heartbeat_tick(Duration::from_secs(5)) // Do heartbeat each 5 seconds
68-
.with_resizer(
73+
.with_heartbeat_tick(Duration::from_secs(5)); // Do heartbeat each 5 seconds
74+
75+
#[cfg(feature = "scaling")]
76+
{
77+
children = children.with_resizer(
6978
OptimalSizeExploringResizer::default()
7079
.with_lower_bound(1) // A minimal acceptable size of group
7180
.with_upper_bound(UpperBound::Limit(10)) // Max 10 actors in runtime
7281
.with_upscale_strategy(UpscaleStrategy::MailboxSizeThreshold(3)) // Scale up when a half of actors have more than 3 messages
7382
.with_upscale_rate(0.1) // Increase the size of group on 10%, if necessary to scale up
7483
.with_downscale_rate(0.2), // Decrease the size of group on 20%, if too many free actors
75-
)
84+
);
85+
}
86+
children
7687
.with_dispatcher(Dispatcher::with_type(DispatcherType::Named(
7788
"Processing".to_string(),
7889
)))

src/bastion/examples/tcp-servers.rs

+60-22
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,62 @@
11
use bastion::prelude::*;
2+
#[cfg(not(target_os = "windows"))]
23
use futures::io;
3-
use std::net::{TcpListener, TcpStream};
4+
#[cfg(target_os = "windows")]
5+
use std::io::{self, Read, Write};
6+
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
47
use std::sync::atomic::{AtomicUsize, Ordering};
58

9+
#[cfg(not(target_os = "windows"))]
10+
async fn run(addr: impl ToSocketAddrs) -> io::Result<()> {
11+
let listener = Handle::<TcpListener>::bind(addr).unwrap();
12+
println!("Listening on {}", listener.get_ref().local_addr().unwrap());
13+
14+
// Accept clients in a loop.
15+
loop {
16+
let (stream, peer_addr) = listener.accept().await.unwrap();
17+
println!("Accepted client: {}", peer_addr);
18+
19+
// Spawn a task that echoes messages from the client back to it.
20+
spawn(echo(stream));
21+
}
22+
Ok(())
23+
}
24+
25+
#[cfg(target_os = "windows")]
26+
async fn run(addr: impl ToSocketAddrs) -> io::Result<()> {
27+
let listener = std::net::TcpListener::bind(addr).unwrap();
28+
println!("Listening on {}", listener.local_addr().unwrap());
29+
30+
// Accept clients in a loop.
31+
for stream in listener.incoming() {
32+
match stream {
33+
Ok(stream) => {
34+
println!("Accepted client");
35+
spawn!(echo(stream));
36+
}
37+
_ => {
38+
break;
39+
}
40+
}
41+
}
42+
Ok(())
43+
}
44+
45+
#[cfg(not(target_os = "windows"))]
646
async fn echo(stream: Handle<TcpStream>) -> io::Result<()> {
747
io::copy(&stream, &mut &stream).await?;
848
Ok(())
949
}
1050

51+
#[cfg(target_os = "windows")]
52+
async fn echo(mut stream: TcpStream) -> io::Result<()> {
53+
let mut buf = [0 as u8; 256];
54+
while let Ok(size) = stream.read(&mut buf) {
55+
stream.write(&buf[0..size]).unwrap();
56+
}
57+
Ok(())
58+
}
59+
1160
const TCP_SERVER_COUNT: usize = 10;
1261
static TCP_SERVERS: AtomicUsize = AtomicUsize::new(TCP_SERVER_COUNT);
1362

@@ -17,7 +66,8 @@ static TCP_SERVERS: AtomicUsize = AtomicUsize::new(TCP_SERVER_COUNT);
1766
/// Prologue:
1867
///
1968
/// This example demonstrates using 10 parallel tcp servers
20-
69+
/// non windows versions use io::copy from nuclei
70+
/// windows versions use a regular stream copy
2171
fn main() {
2272
env_logger::init();
2373

@@ -26,26 +76,14 @@ fn main() {
2676
let _tcp_servers = Bastion::children(|children: Children| {
2777
children
2878
.with_redundancy(TCP_SERVER_COUNT) // Let's have 10 tcp echo servers :)
29-
.with_exec(move |_ctx: BastionContext| {
30-
async move {
31-
println!("Server is starting!");
32-
let port = TCP_SERVERS.fetch_sub(1, Ordering::SeqCst) + 2000;
33-
let addr = format!("127.0.0.1:{}", port);
34-
35-
let listener = Handle::<TcpListener>::bind(addr).unwrap();
36-
println!("Listening on {}", listener.get_ref().local_addr().unwrap());
37-
38-
// Accept clients in a loop.
39-
loop {
40-
let (stream, peer_addr) = listener.accept().await.unwrap();
41-
println!("Accepted client: {}", peer_addr);
42-
43-
// Spawn a task that echoes messages from the client back to it.
44-
spawn(echo(stream));
45-
}
46-
47-
Ok(())
48-
}
79+
.with_exec(move |_ctx: BastionContext| async move {
80+
println!("Server is starting!");
81+
let port = TCP_SERVERS.fetch_sub(1, Ordering::SeqCst) + 2000;
82+
let addr = format!("127.0.0.1:{}", port);
83+
84+
run(addr);
85+
86+
Ok(())
4987
})
5088
})
5189
.expect("Couldn't start a new children group.");

src/bastion/src/bastion.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,14 @@ distributed_api! {
118118
/// // of reference to its elements...
119119
/// let children = Bastion::children(|children| {
120120
/// // ...
121-
/// # children
121+
/// # children.with_exec(|ctx: BastionContext| {
122+
/// # async move {
123+
/// # msg! { ctx.recv().await?,
124+
/// # _: _ => ();
125+
/// # }
126+
/// # Ok(())
127+
/// # }
128+
/// # })
122129
/// }).expect("Couldn't create the children group.");
123130
/// let elems: &[ChildRef] = children.elems();
124131
///

src/bastion/src/child_ref.rs

+26-8
Original file line numberDiff line numberDiff line change
@@ -226,14 +226,32 @@ impl ChildRef {
226226
/// # use bastion::prelude::*;
227227
/// #
228228
/// # Bastion::init();
229-
/// #
230-
/// # let children_ref = Bastion::children(|children| children).unwrap();
231-
/// # let child_ref = &children_ref.elems()[0];
232-
/// child_ref.stop().expect("Couldn't send the message.");
233-
/// #
234-
/// # Bastion::start();
235-
/// # Bastion::stop();
236-
/// # Bastion::block_until_stopped();
229+
/// # let children_ref =
230+
/// # Bastion::children(|children| {
231+
/// children.with_exec(|ctx: BastionContext| {
232+
/// async move {
233+
/// // ...which will receive the message asked...
234+
/// msg! { ctx.recv().await?,
235+
/// msg: &'static str =!> {
236+
/// // Handle the message...
237+
///
238+
/// // ...and eventually answer to it...
239+
/// };
240+
/// // This won't happen because this example
241+
/// // only "asks" a `&'static str`...
242+
/// _: _ => ();
243+
/// }
244+
///
245+
/// Ok(())
246+
/// }
247+
/// })
248+
/// }).expect("Couldn't create the children group.");
249+
/// # Bastion::start();
250+
/// # let child_ref = &children_ref.elems()[0];
251+
/// child_ref.stop().expect("Couldn't send the message.");
252+
/// #
253+
/// # Bastion::stop();
254+
/// # Bastion::block_until_stopped();
237255
/// ```
238256
pub fn stop(&self) -> Result<(), ()> {
239257
debug!("ChildRef({}): Stopping.", self.id);

src/bastion/src/children.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -887,7 +887,7 @@ impl Children {
887887
let children = self.as_ref();
888888
let supervisor = self.bcast.parent().clone().into_supervisor();
889889

890-
#[warn(unused_mut)]
890+
#[allow(unused_mut)]
891891
let mut state = ContextState::new();
892892
#[cfg(feature = "scaling")]
893893
self.init_data_for_scaling(&mut state);

src/bastion/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ pub mod context;
7979
pub mod dispatcher;
8080
pub mod envelope;
8181
pub mod executor;
82+
#[cfg(not(target_os = "windows"))]
8283
pub mod io;
8384
pub mod message;
8485
pub mod path;
@@ -106,6 +107,7 @@ pub mod prelude {
106107
DispatcherType, NotificationType,
107108
};
108109
pub use crate::envelope::{RefAddr, SignedMessage};
110+
#[cfg(not(target_os = "windows"))]
109111
pub use crate::io::*;
110112
pub use crate::message::{Answer, AnswerSender, Message, Msg};
111113
pub use crate::msg;

0 commit comments

Comments
 (0)