Source code for ni_measurementlink_service.measurement.service

"""Framework to host measurement service."""

from __future__ import annotations

import json
from os import path
from pathlib import Path
from threading import Lock
from typing import Any, Callable, Dict, List, Optional, TypeVar

import grpc

from ni_measurementlink_service._internal import grpc_servicer
from ni_measurementlink_service._internal.discovery_client import DiscoveryClient
from ni_measurementlink_service._internal.parameter import (
    metadata as parameter_metadata,
)
from ni_measurementlink_service._internal.service_manager import GrpcService
from ni_measurementlink_service.measurement.info import (
    DataType,
    MeasurementInfo,
    ServiceInfo,
    TypeSpecialization,
)


[docs]class MeasurementContext: """Proxy for the Measurement Service's context-local state.""" @property def grpc_context(self): """Get the context for the RPC.""" return grpc_servicer.measurement_service_context.get().grpc_context @property def pin_map_context(self): """Get the pin map context for the RPC.""" return grpc_servicer.measurement_service_context.get().pin_map_context
[docs] def add_cancel_callback(self, cancel_callback: Callable): """Add a callback which is invoked when the RPC is canceled.""" grpc_servicer.measurement_service_context.get().add_cancel_callback(cancel_callback)
[docs] def cancel(self): """Cancel the RPC.""" grpc_servicer.measurement_service_context.get().cancel()
@property def time_remaining(self): """Get the time remaining for the RPC.""" return grpc_servicer.measurement_service_context.get().time_remaining
[docs] def abort(self, code, details): """Aborts the RPC.""" grpc_servicer.measurement_service_context.get().abort(code, details)
# Eventually, these can be replaced with typing.Self (Python >= 3.11). _TGrpcChannelPool = TypeVar("_TGrpcChannelPool", bound="GrpcChannelPool") _TMeasurementService = TypeVar("_TMeasurementService", bound="MeasurementService")
[docs]class GrpcChannelPool(object): """Class that manages gRPC channel lifetimes.""" def __init__(self): """Initialize the GrpcChannelPool object.""" self._lock: Lock = Lock() self._channel_cache: Dict[str, grpc.Channel] = {}
[docs] def __enter__(self: _TGrpcChannelPool) -> _TGrpcChannelPool: """Enter the runtime context of the GrpcChannelPool.""" return self
[docs] def __exit__(self, exc_type, exc_value, traceback): """Exit the runtime context of the GrpcChannelPool.""" self.close()
[docs] def get_channel(self, target: str) -> grpc.Channel: """Return a gRPC channel. Args: target (str): The server address """ new_channel = None with self._lock: if target not in self._channel_cache: self._lock.release() new_channel = grpc.insecure_channel(target) self._lock.acquire() if target not in self._channel_cache: self._channel_cache[target] = new_channel new_channel = None channel = self._channel_cache[target] # Close new_channel if it was not stored in _channel_cache. if new_channel is not None: new_channel.close() return channel
[docs] def close(self) -> None: """Close channels opened by get_channel().""" with self._lock: for channel in self._channel_cache.values(): channel.close() self._channel_cache.clear()
[docs]class MeasurementService: """Class that supports registering and hosting a python function as a gRPC service. Attributes ---------- measurement_info (info.MeasurementInfo): Measurement info service_info(info.ServiceInfo) : Service Info configuration_parameter_list (List): List of configuration parameters. output_parameter_list (list): List of output parameters. measure_function (Callable): Registered measurement function. context (MeasurementContext): Accessor for context-local state. discovery_client (DiscoveryClient): Client for accessing the MeasurementLink discovery service. channel_pool (GrpcChannelPool): Pool of gRPC channels used by the service. """ def __init__( self, service_config_path: Path, version: str, ui_file_paths: List[Path], service_class: Optional[str] = None, ) -> None: """Initialize the Measurement Service object. Uses the specified .serviceconfig file, version, and UI file paths to initialize a Measurement Service object. Args: service_config_path (Path): Path to the .serviceconfig file. version (str): Version of the measurement service. ui_file_paths (List[Path]): List of paths to supported UIs. service_class (str): The service class from the .serviceconfig to use. Default value is None, which will use the first service in the .serviceconfig file. """ if not path.exists(service_config_path): raise RuntimeError(f"File does not exist. {service_config_path}") with open(service_config_path) as service_config_file: service_config = json.load(service_config_file) if service_class is None: service = next(iter(service_config["services"]), None) else: service = next( (s for s in service_config["services"] if s["serviceClass"] == service_class), None ) if not service: raise RuntimeError( f"Service class '{service_class}' not found in '{service_config_file}'" ) self.measurement_info = MeasurementInfo( display_name=service["displayName"], version=version, ui_file_paths=ui_file_paths, ) self.service_info = ServiceInfo( service_class=service["serviceClass"], description_url=service["descriptionUrl"], ) self.configuration_parameter_list: list = [] self.output_parameter_list: list = [] self.grpc_service = GrpcService() self.context: MeasurementContext = MeasurementContext() self.discovery_client: DiscoveryClient = self.grpc_service.discovery_client self.channel_pool: GrpcChannelPool = GrpcChannelPool()
[docs] def register_measurement(self, measurement_function: Callable) -> Callable: """Register a function as the measurement function for a measurement service. To declare a measurement function, use this idiom: ``` @measurement_service.register_measurement @measurement_service.configuration("Configuration 1", ...) @measurement_service.configuration("Configuration 2", ...) @measurement_service.output("Output 1", ...) @measurement_service.output("Output 2", ...) def measure(configuration1, configuration2): ... return (output1, output2) ``` See also: :func:`.configuration`, :func:`.output` """ self.measure_function = measurement_function return measurement_function
[docs] def configuration( self, display_name: str, type: DataType, default_value: Any, *, instrument_type: str = "" ) -> Callable: """Add a configuration parameter to a measurement function. This decorator maps the measurement service's configuration parameters to Python positional parameters. To add multiple configuration parameters to the same measurement function, use this decorator multiple times. The order of decorator calls must match the order of positional parameters. See also: :func:`.register_measurement` Args ---- display_name (str): Display name of the configuration. type (DataType): Data type of the configuration. default_value (Any): Default value of the configuration. instrument_type (str): Optional. Filter pins by instrument type. This is only supported when configuration type is DataType.Pin. Pin maps have built in instrument definitions using the NI driver based instrument type ids. These can be found as constants in `nims.session_management`. For example, for an NI DCPower instrument the instrument type is `nims.session_management.INSTRUMENT_TYPE_NI_DCPOWER`. For custom instruments the user defined instrument type id is defined in the pin map file. Returns ------- Callable: Callable that takes in Any Python Function and returns the same python function. """ grpc_field_type, repeated, type_specialization = type.value annotations = self._get_annotations(type_specialization, instrument_type) parameter = parameter_metadata.ParameterMetadata( display_name, grpc_field_type, repeated, default_value, annotations ) parameter_metadata.validate_default_value_type(parameter) self.configuration_parameter_list.append(parameter) def _configuration(func): return func return _configuration
[docs] def output(self, display_name: str, type: DataType) -> Callable: """Add a output parameter to a measurement function. This decorator maps the measurement service's output parameters to the elements of the tuple returned by the measurement function. To add multiple output parameters to the same measurement function, use this decorator multiple times. The order of decorator calls must match the order of elements returned by the measurement fuction. See also: :func:`.register_measurement` Args ---- display_name (str): Display name of the output. type (DataType): Data type of the output. Returns ------- Callable: Callable that takes in Any Python Function and returns the same python function. """ grpc_field_type, repeated, type_specialization = type.value parameter = parameter_metadata.ParameterMetadata( display_name, grpc_field_type, repeated, default_value=None, annotations={} ) self.output_parameter_list.append(parameter) def _output(func): return func return _output
[docs] def host_service(self) -> MeasurementService: """Host the registered measurement method as gRPC measurement service. Returns ------- MeasurementService: Context manager that can be used with a with-statement to close the service. Raises ------ Exception: If register measurement methods not available. """ if self.measure_function is None: raise Exception("Error, must register measurement method.") self.grpc_service.start( self.measurement_info, self.service_info, self.configuration_parameter_list, self.output_parameter_list, self.measure_function, ) return self
def _get_annotations( self, type_specialization: TypeSpecialization, instrument_type: str ) -> Dict[str, str]: annotations: Dict[str, str] = {} if type_specialization == TypeSpecialization.NoType: return annotations annotations["ni/type_specialization"] = type_specialization.value if type_specialization == TypeSpecialization.Pin: if instrument_type != "" or instrument_type is not None: annotations["ni/pin.instrument_type"] = instrument_type return annotations
[docs] def close_service(self) -> None: """Close the Service after un-registering with discovery service and cleanups.""" self.grpc_service.stop() self.channel_pool.close()
[docs] def __enter__(self: _TMeasurementService) -> _TMeasurementService: """Enter the runtime context related to the measurement service.""" return self
[docs] def __exit__(self, exc_type, exc_value, traceback): """Exit the runtime context related to the measurement service.""" self.close_service()
[docs] def get_channel(self, provided_interface: str, service_class: str = "") -> grpc.Channel: """Return gRPC channel to specified service. Args ---- provided_interface (str): The gRPC Full Name of the service. service_class (str): The service "class" that should be matched. Returns ------- grpc.Channel: A channel to the gRPC service. Raises ------ Exception: If service_class is not specified and there is more than one matching service registered. """ service_location = self.grpc_service.discovery_client.resolve_service( provided_interface, service_class ) return self.channel_pool.get_channel(target=service_location.insecure_address)