Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition in integ test that was leading to frequent failures #775

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 37 additions & 25 deletions crates/integ/tests/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ use tracing::info;

async fn wait_for_state(
client: &Client,
run_id: Option<i64>,
pipeline_id: &str,
expected_state: &str,
) -> anyhow::Result<()> {
) -> anyhow::Result<i64> {
let mut last_state = "None".to_string();
while last_state != expected_state {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;

let jobs = client
.get_pipeline_jobs()
.id(pipeline_id)
Expand All @@ -30,6 +33,10 @@ async fn wait_for_state(
.unwrap();
let job = jobs.data.first().unwrap();

if Some(job.run_id) == run_id {
continue;
}

let state = job.state.clone();
if last_state != state {
info!("Job transitioned to {}", state);
Expand All @@ -40,10 +47,10 @@ async fn wait_for_state(
bail!("Job transitioned to failed");
}

tokio::time::sleep(Duration::from_millis(100)).await;
if last_state == expected_state {
return Ok(job.run_id);
}
}

Ok(())
}

fn get_client() -> Arc<Client> {
Expand All @@ -66,8 +73,8 @@ fn get_client() -> Arc<Client> {
.clone()
}

async fn start_pipeline(run_id: u32, query: &str, udfs: &[&str]) -> anyhow::Result<String> {
let pipeline_name = format!("pipeline_{}", run_id);
async fn start_pipeline(test_id: u32, query: &str, udfs: &[&str]) -> anyhow::Result<String> {
let pipeline_name = format!("pipeline_{}", test_id);
info!("Creating pipeline {}", pipeline_name);

let pipeline_id = get_client()
Expand All @@ -94,21 +101,21 @@ async fn start_pipeline(run_id: u32, query: &str, udfs: &[&str]) -> anyhow::Resu
}

async fn start_and_monitor(
run_id: u32,
test_id: u32,
query: &str,
udfs: &[&str],
checkpoints_to_wait: u32,
) -> anyhow::Result<(String, String)> {
) -> anyhow::Result<(String, String, i64)> {
let api_client = get_client();

println!("Starting pipeline");
let pipeline_id = start_pipeline(run_id, query, udfs)
let pipeline_id = start_pipeline(test_id, query, udfs)
.await
.expect("failed to start pipeline");

// wait for job to enter running phase
println!("Waiting until running");
wait_for_state(&api_client, &pipeline_id, "Running")
let run_id = wait_for_state(&api_client, None, &pipeline_id, "Running")
.await
.unwrap();

Expand Down Expand Up @@ -151,7 +158,7 @@ async fn start_and_monitor(

assert!(!details.data.is_empty());

return Ok((pipeline_id, job.id.clone()));
return Ok((pipeline_id, job.id.clone(), run_id));
}
}

Expand All @@ -161,9 +168,10 @@ async fn start_and_monitor(

async fn patch_and_wait(
pipeline_id: &str,
run_id: Option<i64>,
body: builder::PipelinePatch,
expected_state: &str,
) -> anyhow::Result<()> {
) -> anyhow::Result<i64> {
println!("Patching with {:?}", body);
get_client()
.patch_pipeline()
Expand All @@ -173,9 +181,7 @@ async fn patch_and_wait(
.await?;

println!("Waiting for {}", expected_state);
wait_for_state(&get_client(), pipeline_id, expected_state).await?;

Ok(())
wait_for_state(&get_client(), run_id, pipeline_id, expected_state).await
}

#[tokio::test]
Expand All @@ -184,8 +190,8 @@ async fn basic_pipeline() {

// create a source
println!("Creating source");
let run_id: u32 = random();
let source_name = format!("source_{}", run_id);
let test_id: u32 = random();
let source_name = format!("source_{}", test_id);

let source_id = api_client
.create_connection_table()
Expand Down Expand Up @@ -222,7 +228,7 @@ async fn basic_pipeline() {
assert_eq!(valid.errors, Vec::<String>::new());
assert!(valid.graph.is_some());

let (pipeline_id, job_id) = start_and_monitor(run_id, &query, &[], 10).await.unwrap();
let (pipeline_id, job_id, _) = start_and_monitor(test_id, &query, &[], 10).await.unwrap();

// get error messages
let errors = api_client
Expand Down Expand Up @@ -268,17 +274,19 @@ async fn basic_pipeline() {
}

// stop job
patch_and_wait(
let run_id = patch_and_wait(
&pipeline_id,
None,
PipelinePatch::builder().stop(StopType::Checkpoint),
"Stopped",
)
.await
.unwrap();

// start job
patch_and_wait(
let run_id = patch_and_wait(
&pipeline_id,
Some(run_id),
PipelinePatch::builder().stop(StopType::None),
"Running",
)
Expand All @@ -287,8 +295,9 @@ async fn basic_pipeline() {

// rescale job
println!("Rescaling pipeline");
patch_and_wait(
let run_id = patch_and_wait(
&pipeline_id,
Some(run_id),
PipelinePatch::builder().parallelism(2),
"Running",
)
Expand Down Expand Up @@ -317,13 +326,14 @@ async fn basic_pipeline() {
.await
.unwrap();

wait_for_state(&api_client, &pipeline_id, "Running")
wait_for_state(&api_client, Some(run_id), &pipeline_id, "Running")
.await
.unwrap();

// stop job
patch_and_wait(
&pipeline_id,
None,
PipelinePatch::builder().stop(StopType::Immediate),
"Stopped",
)
Expand Down Expand Up @@ -387,11 +397,12 @@ select my_double(cast(counter as bigint)) from impulse;

let run_id: u32 = random();

let (pipeline_id, _job_id) = start_and_monitor(run_id, query, &[udf], 3).await.unwrap();
let (pipeline_id, _job_id, _) = start_and_monitor(run_id, query, &[udf], 3).await.unwrap();

// stop job
patch_and_wait(
&pipeline_id,
None,
PipelinePatch::builder().stop(StopType::Checkpoint),
"Stopped",
)
Expand Down Expand Up @@ -601,7 +612,7 @@ async fn connection_table() {
])
);

let (pipeline_id, _) = start_and_monitor(
let (pipeline_id, _, _) = start_and_monitor(
run_id,
&format!("select * from {};", connection_table.name),
&[],
Expand All @@ -613,6 +624,7 @@ async fn connection_table() {
// stop job
patch_and_wait(
&pipeline_id,
None,
PipelinePatch::builder().stop(StopType::Immediate),
"Stopped",
)
Expand Down
Loading