Skip to content

Commit 9ab2fb0

Browse files
authored
Config file (#21)
* create and read the broker config file * create the namespaces with the policies values from config * adapt the dockerfile and makefile for config argument * adapt the github worflow as well, to use config file * update path to config
1 parent 852fbaa commit 9ab2fb0

File tree

10 files changed

+224
-134
lines changed

10 files changed

+224
-134
lines changed

.github/workflows/pull-request-tests.yml

+13-3
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,23 @@ jobs:
6969
env:
7070
RUST_LOG: danube_broker=trace
7171
run: |
72-
docker run --network host -d --name broker1 danube_broker_image:latest --broker-addr "0.0.0.0:6650" --cluster-name "MY_CLUSTER" --meta-store-addr "127.0.0.1:2379" --prom-exporter "0.0.0.0:3000"
73-
72+
docker run --network host -d --name broker1 \
73+
danube_broker_image:latest \
74+
--config-file "/etc/danube_broker.yml" \
75+
--broker-addr "0.0.0.0:6650" \
76+
--admin-addr "0.0.0.0:50051" \
77+
--prom-exporter "0.0.0.0:3000"
78+
7479
- name: Run Broker 2
7580
env:
7681
RUST_LOG: danube_broker=trace
7782
run: |
78-
docker run --network host -d --name broker2 danube_broker_image:latest --broker-addr "0.0.0.0:6651" --cluster-name "MY_CLUSTER" --meta-store-addr "127.0.0.1:2379"
83+
docker run --network host -d --name broker2 \
84+
danube_broker_image:latest \
85+
--config-file "/etc/danube_broker.yml" \
86+
--broker-addr "0.0.0.0:6651" \
87+
--admin-addr "0.0.0.0:50052" \
88+
--prom-exporter "0.0.0.0:3001"
7989
8090
- name: Wait for Brokers to be Ready
8191
run: |

Dockerfile

+4-1
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@ RUN apt-get update && apt-get install -y protobuf-compiler
3232
# Copy the compiled binary from the builder stage
3333
COPY --from=builder /app/target/release/danube-broker /usr/local/bin/danube-broker
3434

35+
# Copy the configuration file into the container (adjust the path if needed)
36+
COPY config/danube_broker.yml /etc/danube_broker.yml
37+
3538
# Expose the ports your broker listens on
3639
EXPOSE 6650 6651
3740

3841
# Define entrypoint and default command
3942
ENTRYPOINT ["/usr/local/bin/danube-broker"]
40-
CMD ["--broker-addr", "0.0.0.0:6650", "--cluster-name", "MY_CLUSTER", "--meta-store-addr", "0.0.0.0:2379"]
43+
CMD ["--config-file", "/etc/danube_broker.yml", "--broker-addr", "0.0.0.0:6650"]

Makefile

+24-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
.DEFAULT_GOAL := no_target_specified
22

3-
# Define broker ports
4-
BROKER_PORTS := 6650 6651 6652
3+
# Base ports for broker_addr, admin_addr, and prom_exporter
4+
BASE_BROKER_PORT := 6650
5+
BASE_ADMIN_PORT := 50051
6+
BASE_PROM_PORT := 9040
7+
8+
# Number of broker instances
9+
NUM_BROKERS := 3
10+
11+
# Path to configuration file
12+
CONFIG_FILE := ./config/danube_broker.yml
513

614
# ETCD configuration
715
ETCD_NAME := etcd-danube
@@ -12,9 +20,6 @@ ETCD_PORT := 2379
1220
HAPROXY_CONFIG := haproxy.cfg
1321
HAPROXY_PORT := 50051
1422

15-
# Rust build command
16-
#BUILD_CMD := cargo build --release
17-
1823
.PHONY: all brokers etcd haproxy etcd-clean brokers-clean haproxy-clean
1924

2025
no_target_specified:
@@ -53,18 +58,26 @@ LOG_LEVEL = $(if $(RUST_LOG),$(RUST_LOG),info)
5358

5459
brokers:
5560
@echo "Building Danube brokers..."
56-
@for port in $(BROKER_PORTS); do \
57-
log_file="broker_$$port.log"; \
58-
echo "Starting broker on port $$port, logging to $$log_file"; \
61+
@for i in $(shell seq 0 $(shell echo $(NUM_BROKERS) - 1 | bc)); do \
62+
broker_port=$$(($(BASE_BROKER_PORT) + i)); \
63+
admin_port=$$(($(BASE_ADMIN_PORT) + i)); \
64+
prom_port=$$(($(BASE_PROM_PORT) + i)); \
65+
log_file="broker_$$broker_port.log"; \
66+
echo "Starting broker on broker port $$broker_port, admin port $$admin_port, prometheus port $$prom_port, logging to $$log_file"; \
5967
RUST_LOG=$(LOG_LEVEL) RUST_BACKTRACE=1 cargo build --release --package danube-broker --bin danube-broker && \
60-
RUST_LOG=$(LOG_LEVEL) RUST_BACKTRACE=1 ./target/release/danube-broker --broker-addr "0.0.0.0:$$port" --cluster-name "MY_CLUSTER" --meta-store-addr "0.0.0.0:2379" > temp/$$log_file 2>&1 & \
68+
RUST_LOG=$(LOG_LEVEL) RUST_BACKTRACE=1 ./target/release/danube-broker \
69+
--config-file $(CONFIG_FILE) \
70+
--broker-addr "0.0.0.0:$$broker_port" \
71+
--admin-addr "0.0.0.0:$$admin_port" \
72+
--prom-exporter "0.0.0.0:$$prom_port" \
73+
> temp/$$log_file 2>&1 & \
6174
sleep 2; \
6275
done
63-
@echo "Danube brokers started on ports: $(BROKER_PORTS)"
76+
@echo "Danube brokers started on ports starting from $(BASE_BROKER_PORT)"
6477

6578
brokers-clean:
6679
@echo "Cleaning up Brokers instances..."
67-
@pids=$$(ps aux | grep '[d]anube-broker --broker-addr \0.0.0.0:'); \
80+
@pids=$$(ps aux | grep '[d]anube-broker --config-file'); \
6881
if [ -n "$$pids" ]; then \
6982
echo "$$pids" | awk '{print $$2}' | xargs -r kill; \
7083
echo "Danube brokers cleaned up."; \

config/danube_broker.yml

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Danube cluster name
2+
cluster_name: "MY_CLUSTER"
3+
4+
# Danube Broker data address
5+
broker_addr: "0.0.0.0:6650"
6+
7+
# Danube Broker admin api address
8+
admin_addr: "0.0.0.0:50051"
9+
10+
# Metadata Persistent Store address
11+
meta_store_addr: "127.0.0.1:2379"
12+
13+
# Prometheus metrics exporter
14+
prom_exporter: "0.0.0.0:9040"
15+
16+
# Namespaces to be created on boot
17+
bootstrap_namespaces:
18+
- "default"
19+
20+
# Broker policies, that can be overwritten by namespace / topic policies
21+
policies:
22+
# Limits the maximum number of producers that can simultaneously publish messages to a specific topic.
23+
# Default is 0, unlimited.
24+
max_producers_per_topic: 0
25+
26+
# Limits the maximum number of subscriptions that can be created on the topic.
27+
# Default is 0, unlimited.
28+
max_subscriptions_per_topic: 0
29+
30+
# Limits the maximum number of consumers that can simultaneously consume messages from a specific topic.
31+
# Default is 0, unlimited.
32+
max_consumers_per_topic: 0
33+
34+
# Limits the maximum number of consumers that can simultaneously use a single subscription on a topic.
35+
# Default is 0, unlimited.
36+
max_consumers_per_subscription: 0
37+
38+
# Defines the Max publish rate (number of messages and/or bytes per second) for producers publishing to the topic.
39+
# Default is 0, unlimited.
40+
max_publish_rate: 0
41+
42+
# Defines the Max dispatch rate (number of messages and/or bytes per second) for the topic.
43+
# Default is 0, unlimited.
44+
max_dispatch_rate: 0
45+
46+
# Defines the dispatch rate for each subscription on the topic.
47+
# Default is 0, unlimited.
48+
max_subscription_dispatch_rate: 0
49+
50+
# Limits the maximum size of a single message that can be published to the topic.
51+
# Default is 10 MB
52+
max_message_size: 10485760 # in bytes which means 10 MB

danube-broker/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ futures = "0.3.30"
2626
base64 = "0.22.1"
2727
metrics = "0.23.0"
2828
metrics-exporter-prometheus = "0.15.3"
29+
serde_yaml = "0.9.34"
2930

3031
[build-dependencies]
3132
tonic-build = "0.11"

danube-broker/src/danube_service.rs

+27-13
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::{
2424
broker_server,
2525
broker_service::BrokerService,
2626
metadata_store::{etcd_watch_prefixes, MetaOptions, MetadataStorage, MetadataStore},
27+
policies::Policies,
2728
resources::{
2829
Resources, BASE_BROKER_LOAD_PATH, BASE_BROKER_PATH, DEFAULT_NAMESPACE, SYSTEM_NAMESPACE,
2930
},
@@ -144,10 +145,20 @@ impl DanubeService {
144145
.await?;
145146

146147
//create the default Namespace
147-
create_namespace_if_absent(&mut self.resources, DEFAULT_NAMESPACE).await?;
148+
create_namespace_if_absent(
149+
&mut self.resources,
150+
DEFAULT_NAMESPACE,
151+
&self.service_config.policies,
152+
)
153+
.await?;
148154

149155
//create system Namespace
150-
create_namespace_if_absent(&mut self.resources, SYSTEM_NAMESPACE).await?;
156+
create_namespace_if_absent(
157+
&mut self.resources,
158+
SYSTEM_NAMESPACE,
159+
&self.service_config.policies,
160+
)
161+
.await?;
151162

152163
//create system topic
153164
if !self.resources.topic.topic_exists(SYSTEM_TOPIC).await? {
@@ -156,7 +167,12 @@ impl DanubeService {
156167

157168
//create bootstrap namespaces
158169
for namespace in &self.service_config.bootstrap_namespaces {
159-
create_namespace_if_absent(&mut self.resources, &namespace).await?;
170+
create_namespace_if_absent(
171+
&mut self.resources,
172+
&namespace,
173+
&self.service_config.policies,
174+
)
175+
.await?;
160176
}
161177

162178
info!("cluster metadata setup completed");
@@ -166,11 +182,9 @@ impl DanubeService {
166182

167183
// Start the Broker GRPC server
168184
//==========================================================================
185+
let broker_addr: std::net::SocketAddr = self.service_config.broker_addr.parse()?;
169186

170-
let grpc_server = broker_server::DanubeServerImpl::new(
171-
self.broker.clone(),
172-
self.service_config.broker_addr,
173-
);
187+
let grpc_server = broker_server::DanubeServerImpl::new(self.broker.clone(), broker_addr);
174188

175189
// Create a oneshot channel for readiness signaling
176190
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
@@ -246,13 +260,12 @@ impl DanubeService {
246260
// Start the Danube Admin GRPC server
247261
//==========================================================================
248262

263+
let admin_addr: std::net::SocketAddr = self.service_config.admin_addr.parse()?;
264+
249265
let broker_service_cloned = Arc::clone(&self.broker);
250266

251-
let admin_server = DanubeAdminImpl::new(
252-
self.service_config.admin_addr,
253-
broker_service_cloned,
254-
self.resources.clone(),
255-
);
267+
let admin_server =
268+
DanubeAdminImpl::new(admin_addr, broker_service_cloned, self.resources.clone());
256269

257270
let admin_handle: tokio::task::JoinHandle<()> = admin_server.start().await;
258271

@@ -369,11 +382,12 @@ impl DanubeService {
369382
pub(crate) async fn create_namespace_if_absent(
370383
resources: &mut Resources,
371384
namespace_name: &str,
385+
policies: &Policies,
372386
) -> Result<()> {
373387
if !resources.namespace.namespace_exist(namespace_name).await? {
374388
resources
375389
.namespace
376-
.create_namespace(namespace_name, None)
390+
.create_namespace(namespace_name, Some(policies))
377391
.await?;
378392
} else {
379393
info!("Namespace {} already exists.", namespace_name);

danube-broker/src/main.rs

+32-48
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ mod subscription;
1717
mod topic;
1818
mod utils;
1919

20-
use std::sync::Arc;
20+
use std::{fs::read_to_string, path::Path, sync::Arc};
2121

2222
use crate::{
2323
broker_metrics::init_metrics,
@@ -45,68 +45,58 @@ pub(crate) mod admin_proto {
4545
#[derive(Parser, Debug)]
4646
#[command(version, about, long_about = None)]
4747
struct Args {
48-
/// Cluster Name
49-
#[arg(short = 'x', long)]
50-
cluster_name: String,
51-
52-
/// Path to config file
48+
/// Path to config file (required)
5349
#[arg(short = 'c', long)]
54-
config_file: Option<String>,
50+
config_file: String,
5551

56-
/// Danube Broker advertised address
57-
#[arg(short = 'b', long, default_value = "0.0.0.0:6650")]
58-
broker_addr: String,
52+
/// Danube Broker advertised address (optional, overrides config file)
53+
#[arg(short = 'b', long)]
54+
broker_addr: Option<String>,
5955

60-
/// Danube Admin address
61-
#[arg(short = 'a', long, default_value = "0.0.0.0:50051")]
62-
admin_addr: String,
56+
/// Danube Broker Admin address (optional, overrides config file)
57+
#[arg(short = 'a', long)]
58+
admin_addr: Option<String>,
6359

64-
/// Prometheus exporter address
60+
/// Prometheus Exporter http address (optional, overrides config file)
6561
#[arg(short = 'p', long)]
6662
prom_exporter: Option<String>,
67-
68-
/// Metadata store address
69-
#[arg(short = 'm', long)]
70-
meta_store_addr: Option<String>,
71-
72-
/// List of namespaces (comma-separated)
73-
#[arg(short = 'n', long, value_parser = parse_namespaces_list)]
74-
namespaces: Vec<String>,
7563
}
7664

7765
#[tokio::main]
7866
async fn main() -> Result<()> {
79-
// install global collector configured based on RUST_LOG env var.
80-
//tracing_subscriber::fmt()
81-
// .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
82-
// .init();
83-
67+
// Initialize logging
8468
tracing_subscriber::fmt::init();
8569

70+
// Parse command line arguments
8671
let args = Args::parse();
8772

88-
let broker_addr: std::net::SocketAddr = args.broker_addr.parse()?;
89-
let admin_addr: std::net::SocketAddr = args.admin_addr.parse()?;
73+
// Load the configuration from the specified YAML file
74+
let config_content = read_to_string(Path::new(&args.config_file))?;
75+
let mut service_config: ServiceConfiguration = serde_yaml::from_str(&config_content)?;
9076

91-
// init metrics with or without prometheus exporter
92-
if let Some(prometheus_exporter) = args.prom_exporter {
77+
// If `broker_addr` is provided via command-line args, override the value from the config file
78+
if let Some(broker_addr) = args.broker_addr {
79+
service_config.broker_addr = broker_addr;
80+
}
81+
82+
// If `admin_addr` is provided via command-line args, override the value from the config file
83+
if let Some(admin_addr) = args.admin_addr {
84+
service_config.admin_addr = admin_addr;
85+
}
86+
87+
// If `prom_exporter` is provided via command-line args, override the value from the config file
88+
if let Some(prom_exporter) = args.prom_exporter {
89+
service_config.prom_exporter = Some(prom_exporter);
90+
}
91+
92+
// Init metrics with or without prometheus exporter
93+
if let Some(prometheus_exporter) = service_config.prom_exporter.clone() {
9394
let prom_addr: std::net::SocketAddr = prometheus_exporter.parse()?;
9495
init_metrics(Some(prom_addr));
9596
} else {
9697
init_metrics(None)
9798
}
9899

99-
// configuration settings for a Danube broker service
100-
// includes various parameters that control the behavior and performance of the broker
101-
// TODO! read from a config file, like danube.conf
102-
let service_config = ServiceConfiguration {
103-
cluster_name: args.cluster_name,
104-
broker_addr: broker_addr,
105-
admin_addr: admin_addr,
106-
meta_store_addr: args.meta_store_addr,
107-
bootstrap_namespaces: args.namespaces,
108-
};
109-
110100
// initialize the storage layer for Danube Metadata
111101
let store_config = MetadataStoreConfig::new();
112102
let metadata_store: MetadataStorage =
@@ -166,9 +156,3 @@ async fn main() -> Result<()> {
166156

167157
Ok(())
168158
}
169-
170-
fn parse_namespaces_list(s: &str) -> Result<Vec<String>> {
171-
Ok(s.split(',')
172-
.map(|nam| nam.trim().to_string())
173-
.collect::<Vec<String>>())
174-
}

0 commit comments

Comments
 (0)