Skip to content

Commit 0c225da

Browse files
authored
Fixes error forwarding to client for failed command executions (#432)
Signed-off-by: Kailash Saravanan <kskeystone@gmail.com>
1 parent 75a98f2 commit 0c225da

File tree

2 files changed

+59
-0
lines changed

2 files changed

+59
-0
lines changed

native-link-scheduler/src/simple_scheduler.rs

+1
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,7 @@ impl SimpleSchedulerImpl {
510510

511511
// Re-queue the action or fail on max attempts.
512512
self.retry_action(&action_info, worker_id, err);
513+
self.tasks_or_workers_change_notify.notify_one();
513514
}
514515

515516
fn update_action(

native-link-scheduler/tests/simple_scheduler_test.rs

+58
Original file line numberDiff line numberDiff line change
@@ -1477,4 +1477,62 @@ mod scheduler_tests {
14771477

14781478
Ok(())
14791479
}
1480+
1481+
/// Regression test for: https://github.com/TraceMachina/native-link/issues/257.
1482+
#[tokio::test]
1483+
async fn ensure_task_or_worker_change_notification_received_test() -> Result<(), Error> {
1484+
const WORKER_ID1: WorkerId = WorkerId(0x0011_1111);
1485+
const WORKER_ID2: WorkerId = WorkerId(0x0022_2222);
1486+
1487+
let scheduler = SimpleScheduler::new_with_callback(
1488+
&native_link_config::schedulers::SimpleScheduler::default(),
1489+
|| async move {},
1490+
);
1491+
let action_digest = DigestInfo::new([99u8; 32], 512);
1492+
1493+
let mut rx_from_worker1 = setup_new_worker(&scheduler, WORKER_ID1, PlatformProperties::default()).await?;
1494+
let mut client_rx = setup_action(
1495+
&scheduler,
1496+
action_digest,
1497+
PlatformProperties::default(),
1498+
make_system_time(1),
1499+
)
1500+
.await?;
1501+
1502+
let mut rx_from_worker2 = setup_new_worker(&scheduler, WORKER_ID2, PlatformProperties::default()).await?;
1503+
1504+
{
1505+
// Other tests check full data. We only care if we got StartAction.
1506+
match rx_from_worker1.recv().await.unwrap().update {
1507+
Some(update_for_worker::Update::StartAction(_)) => { /* Success */ }
1508+
v => panic!("Expected StartAction, got : {v:?}"),
1509+
}
1510+
// Other tests check full data. We only care if client thinks we are Executing.
1511+
assert_eq!(client_rx.borrow_and_update().stage, ActionStage::Executing);
1512+
}
1513+
1514+
scheduler
1515+
.update_action_with_internal_error(
1516+
&WORKER_ID1,
1517+
&ActionInfoHashKey {
1518+
instance_name: INSTANCE_NAME.to_string(),
1519+
digest: action_digest,
1520+
salt: 0,
1521+
},
1522+
make_err!(Code::NotFound, "Some error"),
1523+
)
1524+
.await;
1525+
1526+
tokio::task::yield_now().await; // Allow task<->worker matcher to run.
1527+
1528+
// Now connect a new worker and it should pickup the action.
1529+
{
1530+
// Other tests check full data. We only care if we got StartAction.
1531+
rx_from_worker2.recv().await.err_tip(|| "worker went away")?;
1532+
// Other tests check full data. We only care if client thinks we are Executing.
1533+
assert_eq!(client_rx.borrow_and_update().stage, ActionStage::Executing);
1534+
}
1535+
1536+
Ok(())
1537+
}
14801538
}

0 commit comments

Comments
 (0)