Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional config options #657

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions crates/arroyo-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ use crate::{connection_tables, to_micros};
use arroyo_rpc::config::config;
use cornucopia_async::{Database, DatabaseSource};

const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(10);

async fn compile_sql<'a>(
query: String,
local_udfs: &Vec<Udf>,
Expand Down Expand Up @@ -511,7 +509,7 @@ pub async fn create_pipeline(
let checkpoint_interval = pipeline_post
.checkpoint_interval_micros
.map(Duration::from_micros)
.unwrap_or(DEFAULT_CHECKPOINT_INTERVAL);
.unwrap_or(*config().default_checkpoint_interval);

let job_id = jobs::create_job(
&pipeline_post.name,
Expand Down
5 changes: 2 additions & 3 deletions crates/arroyo-controller/src/job_controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub mod job_metrics;
const CHECKPOINTS_TO_KEEP: u32 = 4;
const CHECKPOINT_ROWS_TO_KEEP: u32 = 100;
const COMPACT_EVERY: u32 = 2;
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);

#[derive(Debug, PartialEq, Eq)]
pub enum WorkerState {
Expand All @@ -58,7 +57,7 @@ pub struct WorkerStatus {

impl WorkerStatus {
fn heartbeat_timeout(&self) -> bool {
self.last_heartbeat.elapsed() > HEARTBEAT_TIMEOUT
self.last_heartbeat.elapsed() > *config().pipeline.worker_heartbeat_timeout
}
}

Expand Down Expand Up @@ -369,7 +368,7 @@ impl RunningJobModel {
}

async fn compact_state(&mut self) -> anyhow::Result<()> {
if !config().controller.compaction.enabled {
if !config().pipeline.compaction.enabled {
info!("Compaction is disabled, skipping compaction");
return Ok(());
}
Expand Down
12 changes: 4 additions & 8 deletions crates/arroyo-controller/src/states/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@ use serde_json::json;

use super::{JobContext, State, Transition};

// after this amount of time, we consider the job to be healthy and reset the restarts counter
const HEALTHY_DURATION: Duration = Duration::from_secs(2 * 60);

// how many times we allow the job to restart before moving it to failed
const RESTARTS_ALLOWED: usize = 10;

#[derive(Debug)]
pub struct Running {}

Expand All @@ -36,6 +30,8 @@ impl State for Running {
async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError> {
stop_if_desired_running!(self, ctx.config);

let pipeline_config = &config().clone().pipeline;

let running_start = Instant::now();

let mut log_interval = tokio::time::interval(Duration::from_secs(60));
Expand Down Expand Up @@ -92,7 +88,7 @@ impl State for Running {
}
}
_ = tokio::time::sleep(Duration::from_millis(200)) => {
if ctx.status.restarts > 0 && running_start.elapsed() > HEALTHY_DURATION {
if ctx.status.restarts > 0 && running_start.elapsed() > *pipeline_config.healthy_duration {
let restarts = ctx.status.restarts;
ctx.status.restarts = 0;
if let Err(e) = ctx.status.update_db(&ctx.db).await {
Expand Down Expand Up @@ -120,7 +116,7 @@ impl State for Running {
"job_id": ctx.config.id,
"error": format!("{:?}", err),
}));
if ctx.status.restarts >= RESTARTS_ALLOWED as i32 {
if pipeline_config.allowed_restarts != -1 && ctx.status.restarts >= pipeline_config.allowed_restarts {
return Err(fatal(
"Job has restarted too many times",
err
Expand Down
21 changes: 11 additions & 10 deletions crates/arroyo-controller/src/states/scheduling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ use crate::{

use super::{running::Running, JobContext, State, Transition};

const WORKER_STARTUP_TIME: Duration = Duration::from_secs(10 * 60);
const TASK_STARTUP_TIME: Duration = Duration::from_secs(2 * 60);

#[derive(Debug, Clone)]
struct WorkerStatus {
id: WorkerId,
Expand Down Expand Up @@ -189,7 +186,7 @@ impl Scheduling {
slots_for_job = slots_needed,
slots_needed = s
);
if start.elapsed() > WORKER_STARTUP_TIME {
if start.elapsed() > *config().pipeline.worker_startup_time {
return Err(fatal(
"Not enough slots to schedule job",
anyhow!("scheduler error -- needed {} slots", slots_needed),
Expand Down Expand Up @@ -244,10 +241,13 @@ impl State for Scheduling {
let worker_connects = Arc::new(Mutex::new(HashMap::new()));
let mut handles = vec![];

let config = &config().pipeline;

let start = Instant::now();
loop {
let timeout = WORKER_STARTUP_TIME
.min(ctx.config.ttl.unwrap_or(WORKER_STARTUP_TIME))
let timeout = config
.worker_startup_time
.min(ctx.config.ttl.unwrap_or(*config.worker_startup_time))
.checked_sub(start.elapsed())
.unwrap_or(Duration::ZERO);

Expand All @@ -268,7 +268,7 @@ impl State for Scheduling {
_ = tokio::time::sleep(timeout) => {
return Err(ctx.retryable(self,
"timed out while waiting for workers to start",
anyhow!("timed out after {:?} while waiting for worker startup", WORKER_STARTUP_TIME), 3));
anyhow!("timed out after {:?} while waiting for worker startup", *config.worker_startup_time), 3));
}
}

Expand Down Expand Up @@ -499,8 +499,9 @@ impl State for Scheduling {
let start = Instant::now();
let mut started_tasks = HashSet::new();
while started_tasks.len() < ctx.program.task_count() {
let timeout = TASK_STARTUP_TIME
.min(ctx.config.ttl.unwrap_or(TASK_STARTUP_TIME))
let timeout = config
.task_startup_time
.min(ctx.config.ttl.unwrap_or(*config.task_startup_time))
.checked_sub(start.elapsed())
.unwrap_or(Duration::ZERO);

Expand Down Expand Up @@ -528,7 +529,7 @@ impl State for Scheduling {
_ = tokio::time::sleep(timeout) => {
return Err(ctx.retryable(self,
"timed out while waiting for tasks to start",
anyhow!("timed out after {:?} while waiting for worker startup", TASK_STARTUP_TIME), 3));
anyhow!("timed out after {:?} while waiting for worker startup", *config.task_startup_time), 3));
}
}
}
Expand Down
26 changes: 17 additions & 9 deletions crates/arroyo-rpc/default.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
checkpoint-url = "/tmp/arroyo/checkpoints"
default-checkpoint-interval = "10s"

[pipeline]
source-batch-size = 512
source-batch-linger = "100ms"
update-aggregate-flush-interval = "1s"
allowed-restarts = 20
worker-heartbeat-timeout = "30s"
healthy-duration = "2m"
worker-startup-time = "10m"
task-startup-time = "2m"

[pipeline.compaction]
enabled = false
checkpoints-to-compact = 4

# Services

Expand All @@ -11,11 +26,6 @@ bind-address = "0.0.0.0"
rpc-port = 9190
scheduler = "process"

[controller.compaction]
enabled = false
checkpoints-to-compact = 4


[compiler]
bind-address = "0.0.0.0"
rpc-port = 9191
Expand All @@ -40,10 +50,6 @@ task-slots = 16
bind-address = "0.0.0.0"
http-port = 8001

[pipeline]
source-batch-size = 512
source-batch-linger = "100ms"
update-aggregate-flush-interval = "1s"

# Schedulers

Expand All @@ -63,6 +69,8 @@ resources = { requests = { cpu = "900m", memory = "500Mi" } }
task-slots = 16
command = "/app/arroyo-bin worker"

# other

[database]
type = "postgres"

Expand Down
24 changes: 21 additions & 3 deletions crates/arroyo-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ pub struct Config {
/// URL of an object store or filesystem for storing checkpoints
pub checkpoint_url: String,

/// Default interval for checkpointing
pub default_checkpoint_interval: HumanReadableDuration,

/// The endpoint of the controller, used by other services to connect to it. This must be set
/// if running the controller on a separate machine from the other services or on a separate
/// process with a non-standard port.
Expand Down Expand Up @@ -273,8 +276,6 @@ pub struct ControllerConfig {

/// The scheduler to use
pub scheduler: Scheduler,

pub compaction: CompactionConfig,
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -381,8 +382,25 @@ pub struct PipelineConfig {
/// Batch linger time (how long to wait before flushing)
pub source_batch_linger: HumanReadableDuration,

// How often to flush aggregates
/// How often to flush aggregates
pub update_aggregate_flush_interval: HumanReadableDuration,

/// How many restarts to allow before moving to failed (-1 for infinite)
pub allowed_restarts: i32,

/// After this amount of time, we consider the job to be healthy and reset the restarts counter
pub healthy_duration: HumanReadableDuration,

/// Number of seconds to wait for a worker heartbeat before considering it dead
pub worker_heartbeat_timeout: HumanReadableDuration,

/// Amount of time to wait for workers to start up before considering them failed
pub worker_startup_time: HumanReadableDuration,

/// Amount of time to wait for tasks to startup before considering it failed
pub task_startup_time: HumanReadableDuration,

pub compaction: CompactionConfig,
}

#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion crates/arroyo-state/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl ParquetBackend {
operator_id: String,
epoch: u32,
) -> Result<HashMap<String, TableCheckpointMetadata>> {
let min_files_to_compact = config().controller.compaction.checkpoints_to_compact as usize;
let min_files_to_compact = config().pipeline.compaction.checkpoints_to_compact as usize;

let operator_checkpoint_metadata =
Self::load_operator_metadata(&job_id, &operator_id, epoch)
Expand Down
2 changes: 1 addition & 1 deletion k8s/arroyo/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ maintainers:
email: support@arroyo.systems
url: https://arroyo.dev

icon: https://raw.githubusercontent.com/ArroyoSystems/arroyo/master/images/arroyo_logo.png
icon: https://raw.githubusercontent.com/ArroyoSystems/arroyo/afc004609351f6e4cd2c1f467bb7bf33ad044ccd/images/arroyo_logo.png
4 changes: 0 additions & 4 deletions k8s/arroyo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ Arroyo control plane. This is the easiest way to get a production quality Arroyo
See the [docs](https://doc.arroyo.dev/deployment/kubernetes) for full information on how to use this helm chart.

Each version of the helm chart is associated by default with a particular release of Arroyo. The latest release
<<<<<<< Updated upstream
is [0.10.0](https://www.arroyo.dev/blog/arroyo-0-10-0).
=======
is [0.10.3](https://www.arroyo.dev/blog/arroyo-0-10-0).
>>>>>>> Stashed changes

## Quickstart

Expand Down
Loading