Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service: Revise DiscoveryClient API #375

Merged
merged 7 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 104 additions & 68 deletions ni_measurementlink_service/_internal/discovery_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Any, Dict, Optional

import grpc
from deprecation import deprecated

from ni_measurementlink_service._internal.stubs.ni.measurementlink.discovery.v1 import (
discovery_service_pb2,
Expand Down Expand Up @@ -54,33 +55,38 @@ def ssl_authenticated_address(self) -> str:


class DiscoveryClient:
"""Class that contains APIs need to interact with discovery service.

Attributes
----------
stub (DiscoveryServiceStub): The gRPC stub used to interact with the discovery
service.

registration_id(string): The ID from discovery service upon successful registration.

"""
"""Client for accessing the MeasurementLink discovery service."""

def __init__(
self, stub: Optional[discovery_service_pb2_grpc.DiscoveryServiceStub] = None
) -> None:
"""Initialize the Discovery Client with provided registry service stub.
"""Initialize the discovery client.

Args:
stub (DiscoveryServiceStub, optional): The gRPC stub to interact with discovery
service. Defaults to None.

stub: An optional discovery service gRPC stub for unit testing.
"""
self._stub = stub
self.registration_id = ""
self._registration_id = ""

@property
@deprecated(
deprecated_in="1.2.0-dev2",
details="This property should not be public and will be removed in a later release.",
)
def registration_id(self) -> str:
""" "The ID from discovery service upon successful registration."""
return self._registration_id

@property
@deprecated(
deprecated_in="1.2.0-dev2",
details="This property should not be public and will be removed in a later release.",
)
def stub(self) -> discovery_service_pb2_grpc.DiscoveryServiceStub:
"""Get the gRPC stub used to interact with the discovery service."""
return self._get_stub()

def _get_stub(self) -> discovery_service_pb2_grpc.DiscoveryServiceStub:
if self._stub is None:
address = _get_discovery_service_address()
channel = grpc.insecure_channel(address)
Expand All @@ -89,45 +95,71 @@ def stub(self) -> discovery_service_pb2_grpc.DiscoveryServiceStub:
self._stub = discovery_service_pb2_grpc.DiscoveryServiceStub(channel)
return self._stub

@deprecated(deprecated_in="1.2.0-dev2", details="Use register_service instead.")
def register_measurement_service(
self, service_port: str, service_info: ServiceInfo, measurement_info: MeasurementInfo
) -> bool:
"""Register the measurement service with the discovery service.

Args:
----
service_port (str): Port Number of the measurement service.
service_port: The port number of the service.

service_info (ServiceInfo): Service Info.
service_info: Information describing the service.

display_name (str): Display name of the service.
measurement_info: Information describing the measurement.

Returns
-------
bool: Boolean to represent if the registration is successful.
Returns:
Boolean indicating whether the service was successfully registered.
"""
if self._registration_id:
raise RuntimeError("Service already registered")

service_location = ServiceLocation(
location="localhost",
insecure_port=service_port,
ssl_authenticated_port="",
)

self._registration_id = self.register_service(
service_info._replace(display_name=measurement_info.display_name),
service_location,
)
return True

def register_service(self, service_info: ServiceInfo, service_location: ServiceLocation) -> str:
"""Register the specified service with the discovery service.

Args:
service_info: Information describing the service.

service_location: The location of the service on the network.

Returns:
ID that can be used to unregister the service.
"""
try:
# Service Location
service_location = discovery_service_pb2.ServiceLocation()
service_location.location = "localhost"
service_location.insecure_port = service_port
# Service Descriptor
service_descriptor = discovery_service_pb2.ServiceDescriptor()
service_descriptor.display_name = measurement_info.display_name
service_descriptor.service_class = service_info.service_class
service_descriptor.description_url = service_info.description_url
service_descriptor.provided_interfaces.extend(service_info.provided_interfaces)
service_descriptor.annotations.update(service_info.annotations)

# Registration Request Creation
grpc_service_description = discovery_service_pb2.ServiceDescriptor(
display_name=service_info.display_name,
description_url=service_info.description_url,
provided_interfaces=service_info.provided_interfaces,
service_class=service_info.service_class,
annotations=service_info.annotations,
)

grpc_service_location = discovery_service_pb2.ServiceLocation(
location=service_location.location,
insecure_port=service_location.insecure_port,
ssl_authenticated_port=service_location.ssl_authenticated_port,
)

request = discovery_service_pb2.RegisterServiceRequest(
location=service_location, service_description=service_descriptor
service_description=grpc_service_description,
location=grpc_service_location,
)
# Registration RPC Call
register_response = self.stub.RegisterService(request)
self.registration_id = register_response.registration_id

response = self._get_stub().RegisterService(request)
_logger.info("Successfully registered with discovery service.")
return response.registration_id
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
_logger.error(
Expand All @@ -144,30 +176,38 @@ def register_measurement_service(
except Exception:
_logger.exception("Error in registering with discovery service.")
raise
return True

def unregister_service(self) -> bool:
"""Un-registers the measurement service from the discovery service.
def unregister_service(self, registration_id: str = "") -> bool:
"""Unregisters the specified service from the discovery service.

Should be called before the service is closed.
This method should be called before the service exits.

Returns
-------
bool: Boolean to represent if the un-registration is successful.
Args:
registration_id: The registration ID returned from register_service.
This argument should be omitted after calling the deprecated
register_measurement_service method.

Returns:
Boolean indicating whether the service was unregistered.
"""
try:
if self.registration_id:
# Un-registration Request Creation
request = discovery_service_pb2.UnregisterServiceRequest(
registration_id=self.registration_id
)
# Un-registration RPC Call
self.stub.UnregisterService(request)
_logger.info("Successfully unregistered with discovery service.")
else:
_logger.info("Not registered with discovery service.")
return False
if not registration_id:
registration_id = self._registration_id
if not registration_id:
_logger.info("Not registered with discovery service.")
return False

request = discovery_service_pb2.UnregisterServiceRequest(
registration_id=registration_id
)

_ = self._get_stub().UnregisterService(request)
_logger.info("Successfully unregistered with discovery service.")

if registration_id == self._registration_id:
self._registration_id = ""

return True
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.UNAVAILABLE:
_logger.error(
Expand All @@ -184,7 +224,6 @@ def unregister_service(self) -> bool:
except Exception:
_logger.exception("Error in unregistering with discovery service.")
raise
return True

def resolve_service(self, provided_interface: str, service_class: str = "") -> ServiceLocation:
"""Resolve the location of a service.
Expand All @@ -194,22 +233,19 @@ def resolve_service(self, provided_interface: str, service_class: str = "") -> S
discovery service if it has not already been started.

Args:
----
provided_interface: The gRPC Full Name of the service.
provided_interface: The gRPC full name of the service.
service_class: The service "class" that should be matched. If the value is not
specified and there is more than one matching service registered, an error
is returned.

Returns
-------
A ServiceLocation location object that represents the location of a service.

Returns:
The service location.
"""
request = discovery_service_pb2.ResolveServiceRequest()
request.provided_interface = provided_interface
request.service_class = service_class
request = discovery_service_pb2.ResolveServiceRequest(
provided_interface=provided_interface, service_class=service_class
)

response: discovery_service_pb2.ServiceLocation = self.stub.ResolveService(request)
response = self._get_stub().ResolveService(request)

return ServiceLocation(
location=response.location,
Expand Down
14 changes: 11 additions & 3 deletions ni_measurementlink_service/_internal/service_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import grpc
from grpc.framework.foundation import logging_pool

from ni_measurementlink_service._internal.discovery_client import DiscoveryClient
from ni_measurementlink_service._internal.discovery_client import (
DiscoveryClient,
ServiceLocation,
)
from ni_measurementlink_service._internal.grpc_servicer import (
MeasurementServiceServicerV1,
MeasurementServiceServicerV2,
Expand Down Expand Up @@ -48,6 +51,7 @@ def __init__(self, discovery_client: Optional[DiscoveryClient] = None) -> None:

"""
self.discovery_client = discovery_client or DiscoveryClient()
self._registration_id = ""

def start(
self,
Expand Down Expand Up @@ -115,13 +119,17 @@ def start(
port = str(self.server.add_insecure_port("[::]:0"))
self.server.start()
_logger.info("Measurement service hosted on port: %s", port)
self.discovery_client.register_measurement_service(port, service_info, measurement_info)

service_location = ServiceLocation("localhost", port, "")
self._registration_id = self.discovery_client.register_service(
service_info, service_location
)

self.port = port
return port

def stop(self) -> None:
"""Close the Service after un-registering with discovery service and cleanups."""
self.discovery_client.unregister_service()
self.discovery_client.unregister_service(self._registration_id)
self.server.stop(5)
_logger.info("Measurement service closed.")
3 changes: 3 additions & 0 deletions ni_measurementlink_service/measurement/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class ServiceInfo(NamedTuple):
- Example: "[\"powerup\", \"current\"]"
"""

display_name: str = ""
"""The service display name for clients to display to users."""


class TypeSpecialization(enum.Enum):
"""Enum that represents the type specializations for measurement parameters."""
Expand Down
1 change: 1 addition & 0 deletions ni_measurementlink_service/measurement/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def convert_value_to_str(value: object) -> str:
}

self.service_info = ServiceInfo(
display_name=service["displayName"],
service_class=service["serviceClass"],
description_url=service["descriptionUrl"],
provided_interfaces=service["providedInterfaces"],
Expand Down
Loading