Source code for ni_measurementlink_service._internal.service_manager

import logging
from typing import Callable, List, Optional

import grpc
from grpc.framework.foundation import logging_pool

from ni_measurementlink_service._internal.discovery_client import DiscoveryClient
from ni_measurementlink_service._internal.grpc_servicer import (
    MeasurementServiceServicer,
)
from ni_measurementlink_service._internal.parameter.metadata import ParameterMetadata
from ni_measurementlink_service._internal.stubs.ni.measurementlink.measurement.v1 import (
    measurement_service_pb2_grpc,
)
from ni_measurementlink_service.measurement.info import MeasurementInfo, ServiceInfo

_logger = logging.getLogger(__name__)


[docs]class GrpcService: """Class that manages hosting the measurement as service and closing service. Attributes ---------- discovery_client (DiscoveryClient, optional): Instance of Discovery Client. Defaults to None. """ def __init__(self, discovery_client: Optional[DiscoveryClient] = None) -> None: """Initialize Service Manager. Args: discovery_client (DiscoveryClient, optional): Instance of Discovery Client. Defaults to None. servicer(MeasurementServiceServicer): The gRPC implementation class of the service. Used in tests. port(str) : The port number of the hosted service.Used in Tests. """ self.discovery_client = discovery_client or DiscoveryClient()
[docs] def start( self, measurement_info: MeasurementInfo, service_info: ServiceInfo, configuration_parameter_list: List[ParameterMetadata], output_parameter_list: List[ParameterMetadata], measure_function: Callable, ) -> str: """Host a gRPC service with the registered measurement method. Args: ---- measurement_info (MeasurementInfo): Measurement info service_info (ServiceInfo): Service info configuration_parameter_list (List): List of configuration parameters. output_parameter_list (List): List of output parameters. measure_function (Callable): Registered measurement function. Returns ------- int: The port number of the server """ self.server = grpc.server( logging_pool.pool(max_workers=10), options=[ ("grpc.max_receive_message_length", -1), ("grpc.max_send_message_length", -1), ], ) self.servicer = MeasurementServiceServicer( measurement_info, configuration_parameter_list, output_parameter_list, measure_function, ) measurement_service_pb2_grpc.add_MeasurementServiceServicer_to_server( self.servicer, self.server ) port = str(self.server.add_insecure_port("[::]:0")) self.server.start() _logger.info("Measurement service hosted on port: %s", port) self.discovery_client.register_measurement_service(port, service_info, measurement_info) self.port = port return port
[docs] def stop(self) -> None: """Close the Service after un-registering with discovery service and cleanups.""" self.discovery_client.unregister_service() self.server.stop(5) _logger.info("Measurement service closed.")