Skip to content

Commit f525ac3

Browse files
authored
Devkelley/find agemo w chariott (eclipse-ibeji#53)
* Add chariott support for Managed Subscribe * Update README with how to run chariott * fixed spacing errors * fixed grammar * Add enum for service's source * updated readme and fixed whitespace issues * minor change to discover_service_using_chariott * Added comments for structs and enum definitions
1 parent 73db225 commit f525ac3

File tree

7 files changed

+245
-43
lines changed

7 files changed

+245
-43
lines changed

core/common/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ parking_lot = { workspace = true }
2424
prost = { workspace = true }
2525
serde = { workspace = true, features = ["derive"] }
2626
serde_derive = { workspace = true }
27+
strum = { workspace = true }
28+
strum_macros = { workspace = true }
2729
regex = {workspace = true }
2830
tokio = { workspace = true }
2931
tonic = { workspace = true }

core/common/src/utils.rs

+117-7
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,49 @@
22
// Licensed under the MIT license.
33
// SPDX-License-Identifier: MIT
44

5-
use config::{Config, File, FileFormat};
6-
use log::debug;
5+
use config::{Config, ConfigError, File, FileFormat};
6+
use core_protobuf_data_access::chariott::service_discovery::core::v1::{
7+
service_registry_client::ServiceRegistryClient, DiscoverRequest,
8+
};
9+
use log::{debug, info};
10+
use serde_derive::Deserialize;
711
use std::future::Future;
12+
use strum_macros::Display;
813
use tokio::time::{sleep, Duration};
14+
use tonic::{Request, Status};
15+
16+
/// An identifier used when discovering a service through Chariott.
17+
#[derive(Debug, Deserialize)]
18+
pub struct ServiceIdentifier {
19+
/// The namespace of the service.
20+
pub namespace: String,
21+
/// The name of the service.
22+
pub name: String,
23+
/// The version of the service.
24+
pub version: String,
25+
}
26+
27+
/// An enum representing where to discover a service's URI.
28+
#[derive(Display, Debug, Deserialize)]
29+
pub enum ServiceUriSource {
30+
/// Use the local configuration settings to find the service's URI.
31+
Local { service_uri: String },
32+
/// Use Chariott to discover the service's URI.
33+
Chariott { chariott_uri: String, service_identifier: ServiceIdentifier },
34+
}
935

1036
/// Load the settings.
1137
///
1238
/// # Arguments
1339
/// * `config_filename` - Name of the config file to load settings from.
14-
pub fn load_settings<T>(config_filename: &str) -> T
40+
pub fn load_settings<T>(config_filename: &str) -> Result<T, ConfigError>
1541
where
1642
T: for<'de> serde::Deserialize<'de>,
1743
{
1844
let config =
19-
Config::builder().add_source(File::new(config_filename, FileFormat::Yaml)).build().unwrap();
20-
21-
let settings: T = config.try_deserialize().unwrap();
45+
Config::builder().add_source(File::new(config_filename, FileFormat::Yaml)).build()?;
2246

23-
settings
47+
config.try_deserialize()
2448
}
2549

2650
/// Retry a function that returns an error.
@@ -64,6 +88,92 @@ where
6488
last_error
6589
}
6690

91+
/// Use Chariott to discover a service.
92+
///
93+
/// # Arguments
94+
/// * `chariott_uri` - Chariott's URI.
95+
/// * `namespace` - The service's namespace.
96+
/// * `name` - The service's name.
97+
/// * `version` - The service's version.
98+
/// # `expected_communication_kind` - The service's expected communication kind.
99+
/// # `expected_communication_reference` - The service's expected communication reference.
100+
pub async fn discover_service_using_chariott(
101+
chariott_uri: &str,
102+
namespace: &str,
103+
name: &str,
104+
version: &str,
105+
expected_communication_kind: &str,
106+
expected_communication_reference: &str,
107+
) -> Result<String, Status> {
108+
let mut client = ServiceRegistryClient::connect(chariott_uri.to_string())
109+
.await
110+
.map_err(|e| Status::internal(e.to_string()))?;
111+
112+
let request = Request::new(DiscoverRequest {
113+
namespace: namespace.to_string(),
114+
name: name.to_string(),
115+
version: version.to_string(),
116+
});
117+
118+
let response = client.discover(request).await?;
119+
120+
let service = response.into_inner().service.ok_or_else(|| Status::not_found("Did not find a service in Chariott with namespace '{namespace}', name '{name}' and version {version}"))?;
121+
122+
if service.communication_kind != expected_communication_kind
123+
&& service.communication_reference != expected_communication_reference
124+
{
125+
Err(Status::not_found(
126+
"Did not find a service in Chariott with namespace '{namespace}', name '{name}' and version {version} that has communication kind '{communication_kind} and communication_reference '{communication_reference}''",
127+
))
128+
} else {
129+
Ok(service.uri)
130+
}
131+
}
132+
133+
/// Get a service's URI from settings or from Chariott.
134+
///
135+
/// # Arguments
136+
/// * `service_uri_source` - Enum providing information on how to get the service URI.
137+
/// # `expected_communication_kind` - The service's expected communication kind.
138+
/// # `expected_communication_reference` - The service's expected communication reference.
139+
pub async fn get_service_uri(
140+
service_uri_source: ServiceUriSource,
141+
expected_communication_kind: &str,
142+
expected_communication_reference: &str,
143+
) -> Result<String, Status> {
144+
let result = match service_uri_source {
145+
ServiceUriSource::Local { service_uri } => {
146+
info!("URI set in settings.");
147+
service_uri
148+
}
149+
ServiceUriSource::Chariott { chariott_uri, service_identifier } => {
150+
info!("Retrieving URI from Chariott.");
151+
152+
execute_with_retry(
153+
30,
154+
Duration::from_secs(1),
155+
|| {
156+
discover_service_using_chariott(
157+
&chariott_uri,
158+
&service_identifier.namespace,
159+
&service_identifier.name,
160+
&service_identifier.version,
161+
expected_communication_kind,
162+
expected_communication_reference,
163+
)
164+
},
165+
Some(format!(
166+
"Attempting to discover service '{}' with chariott.",
167+
service_identifier.name
168+
)),
169+
)
170+
.await?
171+
}
172+
};
173+
174+
Ok(result)
175+
}
176+
67177
#[cfg(test)]
68178
mod tests {
69179
use super::*;

core/invehicle-digital-twin/src/main.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use core_protobuf_data_access::chariott::service_discovery::core::v1::{
2020
};
2121
use core_protobuf_data_access::invehicle_digital_twin::v1::invehicle_digital_twin_server::InvehicleDigitalTwinServer;
2222
use env_logger::{Builder, Target};
23-
use futures::Future;
2423
use log::{debug, error, info, LevelFilter};
2524
use parking_lot::RwLock;
2625
use std::boxed::Box;
@@ -89,10 +88,10 @@ async fn register_invehicle_digital_twin_service_with_chariott(
8988
/// 5. Call and return from the block `.add_module()` on the server with the updated middleware and
9089
/// module.
9190
#[allow(unused_assignments, unused_mut)] // Necessary when no extra modules are built.
92-
fn build_server_and_serve<S>(
91+
async fn build_server_and_serve<S>(
9392
addr: SocketAddr,
9493
base_service: S,
95-
) -> impl Future<Output = Result<(), tonic::transport::Error>>
94+
) -> Result<(), Box<dyn std::error::Error>>
9695
where
9796
S: Service<http::Request<Body>, Response = http::Response<BoxBody>, Error = Infallible>
9897
+ NamedService
@@ -107,7 +106,10 @@ where
107106
// (1) Adds the Managed Subscribe module to the service.
108107
let server = {
109108
// (2) Initialize the Managed Subscribe module, which implements GrpcModule.
110-
let managed_subscribe_module = ManagedSubscribeModule::new();
109+
let managed_subscribe_module = ManagedSubscribeModule::new().await.map_err(|error| {
110+
error!("Unable to create Managed Subscribe module.");
111+
error
112+
})?;
111113

112114
// (3) Create interceptor layer to be added to the server.
113115
let managed_subscribe_layer =
@@ -117,6 +119,8 @@ where
117119
let current_middleware = server.middleware.clone();
118120
let new_middleware = current_middleware.layer(managed_subscribe_layer);
119121

122+
info!("Initialized Managed Subscribe module.");
123+
120124
// (5) Add the module with the updated middleware stack to the server.
121125
server.add_module(new_middleware, Box::new(managed_subscribe_module))
122126
};
@@ -125,7 +129,7 @@ where
125129
let builder = server.construct_server().add_service(base_service);
126130

127131
// Start the server.
128-
builder.serve(addr)
132+
builder.serve(addr).await.map_err(|error| error.into())
129133
}
130134

131135
#[tokio::main]

core/module/managed_subscribe/src/managed_subscribe_module.rs

+32-14
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use core_protobuf_data_access::module::managed_subscribe::v1::{
2020
};
2121

2222
use common::grpc_module::GrpcModule;
23-
use common::utils::{execute_with_retry, load_settings};
23+
use common::utils::{execute_with_retry, get_service_uri, load_settings, ServiceUriSource};
2424
use log::{debug, error, info};
2525
use parking_lot::RwLock;
2626
use serde_derive::Deserialize;
@@ -37,6 +37,9 @@ use super::managed_subscribe_interceptor::ManagedSubscribeInterceptor;
3737
const CONFIG_FILENAME: &str = "managed_subscribe_settings";
3838
const SERVICE_PROTOCOL: &str = "grpc";
3939

40+
const MANAGED_SUBSCRIBE_COMMUNICATION_KIND: &str = "mqtt_v5";
41+
const MANAGED_SUBSCRIBE_COMMUNICATION_REFERENCE: &str = "pubsub.v1.pubsub.proto";
42+
4043
// Managed Subscribe action constants.
4144
const PUBLISH_ACTION: &str = "PUBLISH";
4245
const STOP_PUBLISH_ACTION: &str = "STOP_PUBLISH";
@@ -58,43 +61,58 @@ pub enum TopicAction {
5861
Delete,
5962
}
6063

64+
/// Settings retrieved from a configuration file.
6165
#[derive(Debug, Deserialize)]
6266
pub struct ConfigSettings {
67+
/// Where to host the Managed Subscribe module.
6368
pub base_authority: String,
64-
pub managed_subscribe_uri: String,
65-
pub chariott_uri: Option<String>,
69+
/// Where to retrieve the Managed Subscribe Service URI from.
70+
pub managed_subscribe_uri_source: ServiceUriSource,
6671
}
6772

73+
/// Struct that handles communication with the Managed Subscribe service.
6874
#[derive(Clone, Debug)]
6975
pub struct ManagedSubscribeModule {
76+
/// The URI of the Managed Subscribe service.
7077
pub managed_subscribe_uri: String,
78+
/// The URI of the Managed Subscribe module.
7179
pub service_uri: String,
80+
/// The protocol used to communicate with the Managed Subscribe module.
7281
pub service_protocol: String,
82+
/// Shared store for the Managed Subscribe module.
7383
pub store: Arc<RwLock<ManagedSubscribeStore>>,
7484
}
7585

76-
impl Default for ManagedSubscribeModule {
77-
fn default() -> Self {
78-
Self::new()
79-
}
80-
}
81-
8286
impl ManagedSubscribeModule {
8387
/// Creates a new managed subscribe module object.
84-
pub fn new() -> Self {
88+
pub async fn new() -> Result<Self, Status> {
8589
// Get module information from the configuration settings.
86-
let config = load_settings::<ConfigSettings>(CONFIG_FILENAME);
90+
let config = load_settings::<ConfigSettings>(CONFIG_FILENAME).map_err(|error| {
91+
Status::internal(format!(
92+
"Unable to load 'Managed Subscribe' config with error: {error}."
93+
))
94+
})?;
8795
let endpoint = config.base_authority;
8896
let service_uri = format!("http://{endpoint}"); // Devskim: ignore DS137138
8997

9098
let store = Arc::new(RwLock::new(ManagedSubscribeStore::new()));
9199

92-
ManagedSubscribeModule {
93-
managed_subscribe_uri: config.managed_subscribe_uri,
100+
info!("Getting Managed Subscribe URI.");
101+
102+
// Get the uri of the managed subscribe service from settings or Chariott.
103+
let managed_subscribe_uri = get_service_uri(
104+
config.managed_subscribe_uri_source,
105+
MANAGED_SUBSCRIBE_COMMUNICATION_KIND,
106+
MANAGED_SUBSCRIBE_COMMUNICATION_REFERENCE,
107+
)
108+
.await?;
109+
110+
Ok(ManagedSubscribeModule {
111+
managed_subscribe_uri,
94112
service_uri,
95113
service_protocol: SERVICE_PROTOCOL.to_string(),
96114
store,
97-
}
115+
})
98116
}
99117

100118
/// Creates a new managed subscribe interceptor that shares data with the current instance of

core/module/managed_subscribe/template/managed_subscribe_settings.yaml

+16-5
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,20 @@
66
# Example: "0.0.0.0:80"
77
base_authority: <<value>>
88

9-
# The URI that the Managed Subscribe service listens on for requests.
10-
managed_subscribe_uri: <<value>>
9+
# Information for how to get the Managed Subscribe URI. Only one source can be uncommented at a time.
10+
managed_subscribe_uri_source:
1111

12-
# The URI that the Chariott service listens on for requests.
13-
# If you wish to use Chariott to discover Agemo, then uncomment this setting.
14-
# chariott_uri: <<value>>
12+
# The Managed Subscribe URI will be retrieved from this settings file.
13+
# 'service_uri' - The URI that the Managed Subscribe service listens on for requests.
14+
Local:
15+
service_uri: <<value>>
16+
17+
# The Managed Subscribe URI will be discovered through Chariott.
18+
# 'chariott_uri' - The URI that the Chariott service listens on for requests.
19+
# 'service_identifier' - The service identifier for the Managed Subscribe service.
20+
# Chariott:
21+
# chariott_uri: <<value>>
22+
# service_identifier:
23+
# namespace: <<value>>
24+
# name: <<value>>
25+
# version: <<value>>

samples/managed_subscribe/.accepted_words.txt

+3
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ https
1414
Ibeji
1515
InVehicle
1616
invehicle
17+
md
1718
MQTT
19+
pubsub
1820
repo
21+
sdv
1922
svg
2023
TopicManagementCB
2124
uri

0 commit comments

Comments
 (0)