Skip to content

Commit 7e637c3

Browse files
committed
Digital Twin Graph
1 parent 2cb4bce commit 7e637c3

File tree

1 file changed

+130
-160
lines changed

1 file changed

+130
-160
lines changed

core/module/digital_twin_graph/src/digital_twin_graph_impl.rs

+130-160
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,95 @@ impl DigitalTwinGraphImpl {
151151
})
152152
.collect())
153153
}
154+
155+
/// Send an ask to the provider.
156+
///
157+
/// # Arguments
158+
/// * `client` - The client to use to send the ask.
159+
/// * `respond_uri` - The respond uri.
160+
/// * `ask_id` - The ask id.
161+
/// * `instance_id` - The instance id.
162+
/// * `member_path` - The member path.
163+
/// * `operation` - The operation.
164+
/// * `payload` - The payload.
165+
pub async fn send_ask(
166+
&self,
167+
mut client: RequestClient<tonic::transport::Channel>,
168+
respond_uri: &str,
169+
ask_id: &str,
170+
instance_id: &str,
171+
member_path: &str,
172+
operation: &str,
173+
payload: &str,
174+
) -> Result<(), tonic::Status> {
175+
let targeted_payload = TargetedPayload {
176+
instance_id: instance_id.to_string(),
177+
member_path: member_path.to_string(),
178+
operation: operation.to_string(),
179+
payload: payload.to_string(),
180+
};
181+
182+
// Serialize the targeted payload.
183+
let targeted_payload_json = serde_json::to_string_pretty(&targeted_payload).unwrap();
184+
185+
let request = tonic::Request::new(AskRequest {
186+
respond_uri: respond_uri.to_string(),
187+
ask_id: ask_id.to_string(),
188+
payload: targeted_payload_json.clone(),
189+
});
190+
191+
// Send the ask.
192+
let response = client.ask(request).await;
193+
if let Err(status) = response {
194+
return Err(tonic::Status::internal(format!("Unable to call ask, due to {status:?}")));
195+
}
196+
197+
Ok(())
198+
}
199+
200+
/// Wait for the answer.
201+
///
202+
/// # Arguments
203+
/// * `ask_id` - The ask id.
204+
/// * `rx` - The receiver for the asynchronous channel for AnswerRequest's.
205+
pub async fn wait_for_answer(
206+
&self,
207+
ask_id: String,
208+
rx: &mut broadcast::Receiver<AnswerRequest>,
209+
) -> Result<AnswerRequest, tonic::Status> {
210+
let mut answer_request: AnswerRequest = Default::default();
211+
let mut attempts_after_failure = 0;
212+
const MAX_ATTEMPTS_AFTER_FAILURE: u8 = 10;
213+
while attempts_after_failure < MAX_ATTEMPTS_AFTER_FAILURE {
214+
match timeout(Duration::from_millis(Self::TIMEOUT_PERIOD_IN_MILLIS), rx.recv()).await {
215+
Ok(Ok(request)) => {
216+
if ask_id == request.ask_id {
217+
// We have received the answer request that we are expecting.
218+
answer_request = request;
219+
break;
220+
} else {
221+
// Ignore this answer request, as it is not the one that we are expecting.
222+
// Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep.
223+
continue;
224+
}
225+
}
226+
Ok(Err(error_message)) => {
227+
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
228+
sleep(Duration::from_secs(1)).await;
229+
attempts_after_failure += 1;
230+
continue;
231+
}
232+
Err(error_message) => {
233+
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
234+
sleep(Duration::from_secs(1)).await;
235+
attempts_after_failure += 1;
236+
continue;
237+
}
238+
}
239+
}
240+
241+
Ok(answer_request)
242+
}
154243
}
155244

156245
#[tonic::async_trait]
@@ -210,66 +299,25 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
210299
warn!("Unable to connect. We will skip this one.");
211300
continue;
212301
}
213-
let mut client = client_result.unwrap();
302+
let client = client_result.unwrap();
214303

215304
// Note: The ask id must be a universally unique value.
216305
let ask_id = Uuid::new_v4().to_string();
217306

218-
let targeted_payload = TargetedPayload {
219-
instance_id: instance_id.clone(),
220-
member_path: "".to_string(),
221-
operation: digital_twin_operation::GET.to_string(),
222-
payload: "".to_string(),
223-
};
224-
225-
// Serialize the targeted payload.
226-
let targeted_payload_json = serde_json::to_string_pretty(&targeted_payload).unwrap();
227-
228-
let request = tonic::Request::new(AskRequest {
229-
respond_uri: self.respond_uri.clone(),
230-
ask_id: ask_id.clone(),
231-
payload: targeted_payload_json.clone(),
232-
});
233-
234307
// Send the ask.
235-
let response = client.ask(request).await;
236-
if let Err(status) = response {
237-
warn!("Unable to call ask, due to {status:?}\nWe will skip this one.");
238-
continue;
239-
}
308+
self.send_ask(
309+
client,
310+
&self.respond_uri,
311+
&ask_id,
312+
&instance_id,
313+
"",
314+
digital_twin_operation::GET,
315+
"",
316+
)
317+
.await?;
240318

241-
// Wait for the answer request.
242-
let mut answer_request: AnswerRequest = Default::default();
243-
let mut attempts_after_failure = 0;
244-
while attempts_after_failure < Self::MAX_RETRIES {
245-
match timeout(Duration::from_millis(Self::TIMEOUT_PERIOD_IN_MILLIS), rx.recv())
246-
.await
247-
{
248-
Ok(Ok(request)) => {
249-
if ask_id == request.ask_id {
250-
// We have received the answer request that we are expecting.
251-
answer_request = request;
252-
break;
253-
} else {
254-
// Ignore this answer request, as it is not the one that we are expecting.
255-
// Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep.
256-
continue;
257-
}
258-
}
259-
Ok(Err(error_message)) => {
260-
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
261-
sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await;
262-
attempts_after_failure += 1;
263-
continue;
264-
}
265-
Err(error_message) => {
266-
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
267-
sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await;
268-
attempts_after_failure += 1;
269-
continue;
270-
}
271-
}
272-
}
319+
// Wait for the answer.
320+
let answer_request = self.wait_for_answer(ask_id, &mut rx).await?;
273321

274322
debug!(
275323
"Received an answer request. The ask_id is '{}'. The payload is '{}'",
@@ -325,64 +373,25 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
325373
if client_result.is_err() {
326374
return Err(tonic::Status::internal("Unable to connect to the provider."));
327375
}
328-
let mut client = client_result.unwrap();
376+
let client = client_result.unwrap();
329377

330378
// Note: The ask id must be a universally unique value.
331379
let ask_id = Uuid::new_v4().to_string();
332380

333-
// Create the targeted payload. Note: The member path is not used when the operation is GET.
334-
let targeted_payload = TargetedPayload {
335-
instance_id: instance_id.clone(),
336-
member_path: member_path.clone(),
337-
operation: digital_twin_operation::GET.to_string(),
338-
payload: "".to_string(),
339-
};
340-
341-
// Serialize the targeted payload.
342-
let targeted_payload_json = serde_json::to_string_pretty(&targeted_payload).unwrap();
343-
344-
let request = tonic::Request::new(AskRequest {
345-
respond_uri: self.respond_uri.clone(),
346-
ask_id: ask_id.clone(),
347-
payload: targeted_payload_json.clone(),
348-
});
349-
350381
// Send the ask.
351-
let response = client.ask(request).await;
352-
if let Err(status) = response {
353-
return Err(tonic::Status::internal(format!("Unable to call ask, due to {status:?}")));
354-
}
355-
356-
// Wait for the answer request.
357-
let mut answer_request: AnswerRequest = Default::default();
358-
let mut attempts_after_failure = 0;
359-
while attempts_after_failure < Self::MAX_RETRIES {
360-
match timeout(Duration::from_millis(Self::TIMEOUT_PERIOD_IN_MILLIS), rx.recv()).await {
361-
Ok(Ok(request)) => {
362-
if ask_id == request.ask_id {
363-
// We have received the answer request that we are expecting.
364-
answer_request = request;
365-
break;
366-
} else {
367-
// Ignore this answer request, as it is not the one that we are expecting.
368-
// Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep.
369-
continue;
370-
}
371-
}
372-
Ok(Err(error_message)) => {
373-
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
374-
sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await;
375-
attempts_after_failure += 1;
376-
continue;
377-
}
378-
Err(error_message) => {
379-
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
380-
sleep(Duration::from_millis(Self::BACKOFF_BASE_DURATION_IN_MILLIS)).await;
381-
attempts_after_failure += 1;
382-
continue;
383-
}
384-
}
385-
}
382+
self.send_ask(
383+
client,
384+
&self.respond_uri,
385+
&ask_id,
386+
&instance_id,
387+
&member_path,
388+
digital_twin_operation::GET,
389+
"",
390+
)
391+
.await?;
392+
393+
// Wait for the answer.
394+
let answer_request = self.wait_for_answer(ask_id, &mut rx).await?;
386395

387396
debug!(
388397
"Received an answer request. The ask_id is '{}'. The payload is '{}",
@@ -446,64 +455,25 @@ impl DigitalTwinGraph for DigitalTwinGraphImpl {
446455
if client_result.is_err() {
447456
return Err(tonic::Status::internal("Unable to connect to the provider."));
448457
}
449-
let mut client = client_result.unwrap();
458+
let client = client_result.unwrap();
450459

451460
// Note: The ask id must be a universally unique value.
452461
let ask_id = Uuid::new_v4().to_string();
453462

454-
let targeted_payload = TargetedPayload {
455-
instance_id: instance_id.clone(),
456-
member_path: member_path.clone(),
457-
operation: digital_twin_operation::INVOKE.to_string(),
458-
payload: request_payload.to_string(),
459-
};
460-
461-
// Serialize the targeted payload.
462-
let targeted_payload_json = serde_json::to_string_pretty(&targeted_payload).unwrap();
463-
464-
let request = tonic::Request::new(AskRequest {
465-
respond_uri: self.respond_uri.clone(),
466-
ask_id: ask_id.clone(),
467-
payload: targeted_payload_json.clone(),
468-
});
469-
470463
// Send the ask.
471-
let response = client.ask(request).await;
472-
if let Err(status) = response {
473-
return Err(tonic::Status::internal(format!("Unable to call ask, due to {status:?}")));
474-
}
475-
476-
// Wait for the answer request.
477-
let mut answer_request: AnswerRequest = Default::default();
478-
let mut attempts_after_failure = 0;
479-
const MAX_ATTEMPTS_AFTER_FAILURE: u8 = 10;
480-
while attempts_after_failure < MAX_ATTEMPTS_AFTER_FAILURE {
481-
match timeout(Duration::from_millis(Self::TIMEOUT_PERIOD_IN_MILLIS), rx.recv()).await {
482-
Ok(Ok(request)) => {
483-
if ask_id == request.ask_id {
484-
// We have received the answer request that we are expecting.
485-
answer_request = request;
486-
break;
487-
} else {
488-
// Ignore this answer request, as it is not the one that we are expecting.
489-
// Immediately try again. This was not a failure, so we do not increment attempts_after_failure or sleep.
490-
continue;
491-
}
492-
}
493-
Ok(Err(error_message)) => {
494-
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
495-
sleep(Duration::from_secs(1)).await;
496-
attempts_after_failure += 1;
497-
continue;
498-
}
499-
Err(error_message) => {
500-
warn!("Failed to receive the answer request. The error message is '{}'. We may retry in a moment.", error_message);
501-
sleep(Duration::from_secs(1)).await;
502-
attempts_after_failure += 1;
503-
continue;
504-
}
505-
}
506-
}
464+
self.send_ask(
465+
client,
466+
&self.respond_uri,
467+
&ask_id,
468+
&instance_id,
469+
&member_path,
470+
digital_twin_operation::INVOKE,
471+
&request_payload,
472+
)
473+
.await?;
474+
475+
// Wait for the answer.
476+
let answer_request = self.wait_for_answer(ask_id, &mut rx).await?;
507477

508478
debug!(
509479
"Received an answer request. The ask_id is '{}'. The payload is '{}",

0 commit comments

Comments
 (0)