Source code for ni_measurementlink_service._internal.discovery_client

""" Contains API to register and un-register measurement service with discovery service.
"""
import json
import logging
import os
import pathlib
import sys
import typing
from typing import Optional

import grpc

from ni_measurementlink_service._internal.stubs.ni.measurementlink.discovery.v1 import (
    discovery_service_pb2,
    discovery_service_pb2_grpc,
)
from ni_measurementlink_service.measurement.info import MeasurementInfo, ServiceInfo

if sys.platform == "win32":
    import errno
    import msvcrt

    import win32con
    import win32file
    import winerror


_PROVIDED_MEASUREMENT_SERVICE = "ni.measurementlink.measurement.v1.MeasurementService"

_logger = logging.getLogger(__name__)


[docs]class ServiceLocation(typing.NamedTuple): """Represents the location of a service.""" location: str insecure_port: str ssl_authenticated_port: str @property def insecure_address(self) -> str: """Get the service's insecure address in the format host:port.""" return f"{self.location}:{self.insecure_port}" @property def ssl_authenticated_address(self) -> str: """Get the service's SSL-authenticated address in the format host:port.""" return f"{self.location}:{self.ssl_authenticated_port}"
[docs]class DiscoveryClient: """Class that contains APIs need to interact with discovery service. Attributes ---------- stub (DiscoveryServiceStub): The gRPC stub used to interact with the discovery service. registration_id(string): The ID from discovery service upon successful registration. """ def __init__(self, stub: Optional[discovery_service_pb2_grpc.DiscoveryServiceStub] = None): """Initialize the Discovery Client with provided registry service stub. Args: stub (DiscoveryServiceStub, optional): The gRPC stub to interact with discovery service. Defaults to None. """ self._stub = stub self.registration_id = "" @property def stub(self) -> discovery_service_pb2_grpc.DiscoveryServiceStub: """Get the gRPC stub used to interact with the discovery service.""" if self._stub is None: address = _get_discovery_service_address() channel = grpc.insecure_channel(address) self._stub = discovery_service_pb2_grpc.DiscoveryServiceStub(channel) return self._stub
[docs] def register_measurement_service( self, service_port: str, service_info: ServiceInfo, measurement_info: MeasurementInfo ) -> bool: """Register the measurement service with the discovery service. Args: ---- service_port (str): Port Number of the measurement service. service_info (ServiceInfo): Service Info. display_name (str): Display name of the service. Returns ------- bool: Boolean to represent if the registration is successful. """ try: # Service Location service_location = discovery_service_pb2.ServiceLocation() service_location.location = "localhost" service_location.insecure_port = service_port # Service Descriptor service_descriptor = discovery_service_pb2.ServiceDescriptor() service_descriptor.display_name = measurement_info.display_name service_descriptor.service_class = service_info.service_class service_descriptor.description_url = service_info.description_url service_descriptor.provided_interfaces.append(_PROVIDED_MEASUREMENT_SERVICE) # Registration Request Creation request = discovery_service_pb2.RegisterServiceRequest( location=service_location, service_description=service_descriptor ) # Registration RPC Call register_response = self.stub.RegisterService(request) self.registration_id = register_response.registration_id _logger.info("Successfully registered with discovery service.") except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: _logger.error( "Unable to register with discovery service. Possible reason: discovery service not available." ) else: _logger.exception("Error in registering with discovery service.") return False except FileNotFoundError: _logger.error( "Unable to register with discovery service. Possible reason: discovery service not running." ) except Exception: _logger.exception("Error in registering with discovery service.") return False return True
[docs] def unregister_service(self) -> bool: """Un-registers the measurement service from the discovery service. Should be called before the service is closed. Returns ------- bool: Boolean to represent if the un-registration is successful. """ try: if self.registration_id: # Un-registration Request Creation request = discovery_service_pb2.UnregisterServiceRequest( registration_id=self.registration_id ) # Un-registration RPC Call self.stub.UnregisterService(request) _logger.info("Successfully unregistered with discovery service.") else: _logger.info("Not registered with discovery service.") except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: _logger.error( "Unable to unregister with discovery service. Possible reason: discovery service not available." ) else: _logger.exception("Error in unregistering with discovery service.") return False except FileNotFoundError: _logger.error( "Unable to unregister with discovery service. Possible reason: discovery service not running." ) except Exception: _logger.exception("Error in unregistering with discovery service.") return False return True
[docs] def resolve_service(self, provided_interface: str, service_class: str = "") -> ServiceLocation: """Resolve the location of a service. Given a description of a service, returns information that can be used to establish communication with that service. If necessary, the service will be started by the discovery service if it has not already been started. Args: ---- provided_interface: The gRPC Full Name of the service. service_class: The service "class" that should be matched. If the value is not specified and there is more than one matching service registered, an error is returned. Returns ------- A ServiceLocation location object that represents the location of a service. """ request = discovery_service_pb2.ResolveServiceRequest() request.provided_interface = provided_interface request.service_class = service_class response: discovery_service_pb2.ServiceLocation = self.stub.ResolveService(request) return ServiceLocation( location=response.location, insecure_port=response.insecure_port, ssl_authenticated_port=response.ssl_authenticated_port, )
def _get_discovery_service_address() -> str: key_file_path = _get_key_file_path() _logger.debug("Discovery service key file path: %s", key_file_path) with _open_key_file(str(key_file_path)) as key_file: key_json = json.load(key_file) return "localhost:" + key_json["InsecurePort"] def _get_key_file_path(cluster_id: Optional[str] = None) -> pathlib.Path: if cluster_id is not None: return _get_key_file_directory() / f"DiscoveryService_{cluster_id}.json" return _get_key_file_directory() / "DiscoveryService.json" def _get_key_file_directory() -> pathlib.Path: version = discovery_service_pb2.DESCRIPTOR.package.split(".")[-1] if sys.platform == "win32": return ( pathlib.Path(os.environ["ProgramData"]) / "National Instruments" / "MeasurementLink" / "Discovery" / version ) else: raise NotImplementedError("Platform not supported") def _open_key_file(path: str) -> typing.TextIO: if sys.platform == "win32": # Use the Win32 API to specify the share mode. Otherwise, opening the file throws # PermissionError due to a sharing violation. This is a workaround for # https://github.com/python/cpython/issues/59449 # (Support for opening files with FILE_SHARE_DELETE on Windows). try: win32_file_handle = win32file.CreateFile( path, win32file.GENERIC_READ, win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE | win32file.FILE_SHARE_DELETE, None, win32con.OPEN_EXISTING, 0, None, ) except win32file.error as e: if e.winerror == winerror.ERROR_FILE_NOT_FOUND: raise FileNotFoundError(errno.ENOENT, e.strerror, path) from e elif ( e.winerror == winerror.ERROR_ACCESS_DENIED or e.winerror == winerror.ERROR_SHARING_VIOLATION ): raise PermissionError(errno.EACCES, e.strerror, path) from e raise # The CRT file descriptor takes ownership of the Win32 file handle. # os.O_TEXT is unnecessary because Python handles newline conversion. crt_file_descriptor = msvcrt.open_osfhandle(win32_file_handle.handle, os.O_RDONLY) win32_file_handle.Detach() # The Python file object takes ownership of the CRT file descriptor. Closing the Python # file object closes the underlying Win32 file handle. return os.fdopen(crt_file_descriptor, "r", encoding="utf-8-sig") else: return open(path, "r")