Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Devkelley/find agemo w chariott #53

Merged
merged 10 commits into from
Oct 17, 2023
2 changes: 2 additions & 0 deletions core/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ parking_lot = { workspace = true }
prost = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_derive = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
regex = {workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
Expand Down
124 changes: 117 additions & 7 deletions core/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,49 @@
// Licensed under the MIT license.
// SPDX-License-Identifier: MIT

use config::{Config, File, FileFormat};
use log::debug;
use config::{Config, ConfigError, File, FileFormat};
use core_protobuf_data_access::chariott::service_discovery::core::v1::{
service_registry_client::ServiceRegistryClient, DiscoverRequest,
};
use log::{debug, info};
use serde_derive::Deserialize;
use std::future::Future;
use strum_macros::Display;
use tokio::time::{sleep, Duration};
use tonic::{Request, Status};

/// An identifier used when discovering a service through Chariott.
#[derive(Debug, Deserialize)]
pub struct ServiceIdentifier {
/// The namespace of the service.
pub namespace: String,
/// The name of the service.
pub name: String,
/// The version of the service.
pub version: String,
}

/// An enum representing where to discover a service's URI.
#[derive(Display, Debug, Deserialize)]
pub enum ServiceUriSource {
/// Use the local configuration settings to find the service's URI.
Local { service_uri: String },
/// Use Chariott to discover the service's URI.
Chariott { chariott_uri: String, service_identifier: ServiceIdentifier },
}

/// Load the settings.
///
/// # Arguments
/// * `config_filename` - Name of the config file to load settings from.
pub fn load_settings<T>(config_filename: &str) -> T
pub fn load_settings<T>(config_filename: &str) -> Result<T, ConfigError>
where
T: for<'de> serde::Deserialize<'de>,
{
let config =
Config::builder().add_source(File::new(config_filename, FileFormat::Yaml)).build().unwrap();

let settings: T = config.try_deserialize().unwrap();
Config::builder().add_source(File::new(config_filename, FileFormat::Yaml)).build()?;

settings
config.try_deserialize()
}

/// Retry a function that returns an error.
Expand Down Expand Up @@ -64,6 +88,92 @@ where
last_error
}

/// Use Chariott to discover a service.
///
/// # Arguments
/// * `chariott_uri` - Chariott's URI.
/// * `namespace` - The service's namespace.
/// * `name` - The service's name.
/// * `version` - The service's version.
/// # `expected_communication_kind` - The service's expected communication kind.
/// # `expected_communication_reference` - The service's expected communication reference.
pub async fn discover_service_using_chariott(
chariott_uri: &str,
namespace: &str,
name: &str,
version: &str,
expected_communication_kind: &str,
expected_communication_reference: &str,
) -> Result<String, Status> {
let mut client = ServiceRegistryClient::connect(chariott_uri.to_string())
.await
.map_err(|e| Status::internal(e.to_string()))?;

let request = Request::new(DiscoverRequest {
namespace: namespace.to_string(),
name: name.to_string(),
version: version.to_string(),
});

let response = client.discover(request).await?;

let service = response.into_inner().service.ok_or_else(|| Status::not_found("Did not find a service in Chariott with namespace '{namespace}', name '{name}' and version {version}"))?;

if service.communication_kind != expected_communication_kind
&& service.communication_reference != expected_communication_reference
{
Err(Status::not_found(
"Did not find a service in Chariott with namespace '{namespace}', name '{name}' and version {version} that has communication kind '{communication_kind} and communication_reference '{communication_reference}''",
))
} else {
Ok(service.uri)
}
}

/// Get a service's URI from settings or from Chariott.
///
/// # Arguments
/// * `service_uri_source` - Enum providing information on how to get the service URI.
/// # `expected_communication_kind` - The service's expected communication kind.
/// # `expected_communication_reference` - The service's expected communication reference.
pub async fn get_service_uri(
service_uri_source: ServiceUriSource,
expected_communication_kind: &str,
expected_communication_reference: &str,
) -> Result<String, Status> {
let result = match service_uri_source {
ServiceUriSource::Local { service_uri } => {
info!("URI set in settings.");
service_uri
}
ServiceUriSource::Chariott { chariott_uri, service_identifier } => {
info!("Retrieving URI from Chariott.");

execute_with_retry(
30,
Duration::from_secs(1),
|| {
discover_service_using_chariott(
&chariott_uri,
&service_identifier.namespace,
&service_identifier.name,
&service_identifier.version,
expected_communication_kind,
expected_communication_reference,
)
},
Some(format!(
"Attempting to discover service '{}' with chariott.",
service_identifier.name
)),
)
.await?
}
};

Ok(result)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
14 changes: 9 additions & 5 deletions core/invehicle-digital-twin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use core_protobuf_data_access::chariott::service_discovery::core::v1::{
};
use core_protobuf_data_access::invehicle_digital_twin::v1::invehicle_digital_twin_server::InvehicleDigitalTwinServer;
use env_logger::{Builder, Target};
use futures::Future;
use log::{debug, error, info, LevelFilter};
use parking_lot::RwLock;
use std::boxed::Box;
Expand Down Expand Up @@ -89,10 +88,10 @@ async fn register_invehicle_digital_twin_service_with_chariott(
/// 5. Call and return from the block `.add_module()` on the server with the updated middleware and
/// module.
#[allow(unused_assignments, unused_mut)] // Necessary when no extra modules are built.
fn build_server_and_serve<S>(
async fn build_server_and_serve<S>(
addr: SocketAddr,
base_service: S,
) -> impl Future<Output = Result<(), tonic::transport::Error>>
) -> Result<(), Box<dyn std::error::Error>>
where
S: Service<http::Request<Body>, Response = http::Response<BoxBody>, Error = Infallible>
+ NamedService
Expand All @@ -107,7 +106,10 @@ where
// (1) Adds the Managed Subscribe module to the service.
let server = {
// (2) Initialize the Managed Subscribe module, which implements GrpcModule.
let managed_subscribe_module = ManagedSubscribeModule::new();
let managed_subscribe_module = ManagedSubscribeModule::new().await.map_err(|error| {
error!("Unable to create Managed Subscribe module.");
error
})?;

// (3) Create interceptor layer to be added to the server.
let managed_subscribe_layer =
Expand All @@ -117,6 +119,8 @@ where
let current_middleware = server.middleware.clone();
let new_middleware = current_middleware.layer(managed_subscribe_layer);

info!("Initialized Managed Subscribe module.");

// (5) Add the module with the updated middleware stack to the server.
server.add_module(new_middleware, Box::new(managed_subscribe_module))
};
Expand All @@ -125,7 +129,7 @@ where
let builder = server.construct_server().add_service(base_service);

// Start the server.
builder.serve(addr)
builder.serve(addr).await.map_err(|error| error.into())
}

#[tokio::main]
Expand Down
46 changes: 32 additions & 14 deletions core/module/managed_subscribe/src/managed_subscribe_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use core_protobuf_data_access::module::managed_subscribe::v1::{
};

use common::grpc_module::GrpcModule;
use common::utils::{execute_with_retry, load_settings};
use common::utils::{execute_with_retry, get_service_uri, load_settings, ServiceUriSource};
use log::{debug, error, info};
use parking_lot::RwLock;
use serde_derive::Deserialize;
Expand All @@ -37,6 +37,9 @@ use super::managed_subscribe_interceptor::ManagedSubscribeInterceptor;
const CONFIG_FILENAME: &str = "managed_subscribe_settings";
const SERVICE_PROTOCOL: &str = "grpc";

const MANAGED_SUBSCRIBE_COMMUNICATION_KIND: &str = "mqtt_v5";
const MANAGED_SUBSCRIBE_COMMUNICATION_REFERENCE: &str = "pubsub.v1.pubsub.proto";

// Managed Subscribe action constants.
const PUBLISH_ACTION: &str = "PUBLISH";
const STOP_PUBLISH_ACTION: &str = "STOP_PUBLISH";
Expand All @@ -58,43 +61,58 @@ pub enum TopicAction {
Delete,
}

/// Settings retrieved from a configuration file.
#[derive(Debug, Deserialize)]
pub struct ConfigSettings {
/// Where to host the Managed Subscribe module.
pub base_authority: String,
pub managed_subscribe_uri: String,
pub chariott_uri: Option<String>,
/// Where to retrieve the Managed Subscribe Service URI from.
pub managed_subscribe_uri_source: ServiceUriSource,
}

/// Struct that handles communication with the Managed Subscribe service.
#[derive(Clone, Debug)]
pub struct ManagedSubscribeModule {
/// The URI of the Managed Subscribe service.
pub managed_subscribe_uri: String,
/// The URI of the Managed Subscribe module.
pub service_uri: String,
/// The protocol used to communicate with the Managed Subscribe module.
pub service_protocol: String,
/// Shared store for the Managed Subscribe module.
pub store: Arc<RwLock<ManagedSubscribeStore>>,
}

impl Default for ManagedSubscribeModule {
fn default() -> Self {
Self::new()
}
}

impl ManagedSubscribeModule {
/// Creates a new managed subscribe module object.
pub fn new() -> Self {
pub async fn new() -> Result<Self, Status> {
// Get module information from the configuration settings.
let config = load_settings::<ConfigSettings>(CONFIG_FILENAME);
let config = load_settings::<ConfigSettings>(CONFIG_FILENAME).map_err(|error| {
Status::internal(format!(
"Unable to load 'Managed Subscribe' config with error: {error}."
))
})?;
let endpoint = config.base_authority;
let service_uri = format!("http://{endpoint}"); // Devskim: ignore DS137138

let store = Arc::new(RwLock::new(ManagedSubscribeStore::new()));

ManagedSubscribeModule {
managed_subscribe_uri: config.managed_subscribe_uri,
info!("Getting Managed Subscribe URI.");

// Get the uri of the managed subscribe service from settings or Chariott.
let managed_subscribe_uri = get_service_uri(
config.managed_subscribe_uri_source,
MANAGED_SUBSCRIBE_COMMUNICATION_KIND,
MANAGED_SUBSCRIBE_COMMUNICATION_REFERENCE,
)
.await?;

Ok(ManagedSubscribeModule {
managed_subscribe_uri,
service_uri,
service_protocol: SERVICE_PROTOCOL.to_string(),
store,
}
})
}

/// Creates a new managed subscribe interceptor that shares data with the current instance of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,20 @@
# Example: "0.0.0.0:80"
base_authority: <<value>>

# The URI that the Managed Subscribe service listens on for requests.
managed_subscribe_uri: <<value>>
# Information for how to get the Managed Subscribe URI. Only one source can be uncommented at a time.
managed_subscribe_uri_source:

# The URI that the Chariott service listens on for requests.
# If you wish to use Chariott to discover Agemo, then uncomment this setting.
# chariott_uri: <<value>>
# The Managed Subscribe URI will be retrieved from this settings file.
# 'service_uri' - The URI that the Managed Subscribe service listens on for requests.
Local:
service_uri: <<value>>

# The Managed Subscribe URI will be discovered through Chariott.
# 'chariott_uri' - The URI that the Chariott service listens on for requests.
# 'service_identifier' - The service identifier for the Managed Subscribe service.
# Chariott:
# chariott_uri: <<value>>
# service_identifier:
# namespace: <<value>>
# name: <<value>>
# version: <<value>>
3 changes: 3 additions & 0 deletions samples/managed_subscribe/.accepted_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ https
Ibeji
InVehicle
invehicle
md
MQTT
pubsub
repo
sdv
svg
TopicManagementCB
uri
Expand Down
Loading