Source code for ni_measurementlink_service._internal.parameter.serialization_strategy

"""Serialization Strategy."""

from __future__ import annotations

from typing import Any, Optional, cast

from google.protobuf import type_pb2
from google.protobuf.internal import decoder, encoder
from google.protobuf.message import Message

from ni_measurementlink_service._internal.parameter import _message
from ni_measurementlink_service._internal.parameter._serializer_types import (
    Decoder,
    DecoderConstructor,
    Encoder,
    EncoderConstructor,
    Key,
    PartialDecoderConstructor,
    PartialEncoderConstructor,
)
from ni_measurementlink_service._internal.stubs.ni.protobuf.types import xydata_pb2


def _scalar_encoder(encoder: EncoderConstructor) -> PartialEncoderConstructor:
    """Constructs a scalar encoder constructor.

    Takes a field index and returns an Encoder.

    This class returns the Encoder with is_repeated set to False
    and is_packed set to False.
    """

    def scalar_encoder(field_index: int) -> Encoder:
        is_repeated = False
        is_packed = False
        return encoder(field_index, is_repeated, is_packed)

    return scalar_encoder


def _vector_encoder(
    encoder: EncoderConstructor, is_packed: bool = True
) -> PartialEncoderConstructor:
    """Constructs a vector (array) encoder constructor.

    Takes a field index and returns an Encoder.

    This class returns the Encoder with is_repeated set to True
    and is_packed defaulting to True.
    """

    def vector_encoder(field_index: int) -> Encoder:
        is_repeated = True
        return encoder(field_index, is_repeated, is_packed)

    return vector_encoder


def _scalar_decoder(decoder: DecoderConstructor) -> PartialDecoderConstructor:
    """Constructs a scalar decoder constructor.

    Takes a field index and a key and returns a Decoder.

    This class returns the Decoder with is_repeated set to False
    and is_packed set to False.
    """

    def _unsupported_new_default(message: Optional[Message]) -> Any:
        raise NotImplementedError(
            "This function should not be called. Verify that you are using up-to-date and compatible versions of the ni-measurementlink-service and protobuf packages."
        )

    def scalar_decoder(field_index: int, key: Key) -> Decoder:
        is_repeated = False
        is_packed = False
        return decoder(field_index, is_repeated, is_packed, key, _unsupported_new_default)

    return scalar_decoder


def _vector_decoder(
    decoder: DecoderConstructor, is_packed: bool = True
) -> PartialDecoderConstructor:
    """Constructs a vector (array) decoder constructor.

    Takes a field index and a key and returns a Decoder.

    This class returns the Decoder with is_repeated set to True
    and is_packed defaulting to True.
    """

    def _new_default(unused_message: Optional[Message] = None) -> Any:
        return []

    def vector_decoder(field_index: int, key: Key) -> Decoder:
        is_repeated = True
        return decoder(field_index, is_repeated, is_packed, key, _new_default)

    return vector_decoder


def _double_xy_data_decoder(
    decoder: DecoderConstructor, is_repeated: bool
) -> PartialDecoderConstructor:
    """Constructs a DoubleXYData decoder constructor.

    Takes a field index and a key and returns a Decoder for DoubleXYData.
    """

    def _new_default(unused_message: Optional[Message] = None) -> Any:
        return xydata_pb2.DoubleXYData()

    def message_decoder(field_index: int, key: Key) -> Decoder:
        is_packed = True
        return decoder(field_index, is_repeated, is_packed, key, _new_default)

    return message_decoder


# Cast works around this issue in typeshed
# https://github.com/python/typeshed/issues/10695
FloatEncoder = _scalar_encoder(cast(EncoderConstructor, encoder.FloatEncoder))
DoubleEncoder = _scalar_encoder(cast(EncoderConstructor, encoder.DoubleEncoder))
IntEncoder = _scalar_encoder(cast(EncoderConstructor, encoder.Int32Encoder))
UIntEncoder = _scalar_encoder(cast(EncoderConstructor, encoder.UInt32Encoder))
BoolEncoder = _scalar_encoder(encoder.BoolEncoder)
StringEncoder = _scalar_encoder(encoder.StringEncoder)
MessageEncoder = _scalar_encoder(cast(EncoderConstructor, _message._message_encoder_constructor))

FloatArrayEncoder = _vector_encoder(cast(EncoderConstructor, encoder.FloatEncoder))
DoubleArrayEncoder = _vector_encoder(cast(EncoderConstructor, encoder.DoubleEncoder))
IntArrayEncoder = _vector_encoder(cast(EncoderConstructor, encoder.Int32Encoder))
UIntArrayEncoder = _vector_encoder(cast(EncoderConstructor, encoder.UInt32Encoder))
BoolArrayEncoder = _vector_encoder(encoder.BoolEncoder)
StringArrayEncoder = _vector_encoder(encoder.StringEncoder, is_packed=False)
MessageArrayEncoder = _vector_encoder(
    cast(EncoderConstructor, _message._message_encoder_constructor)
)

# Cast works around this issue in typeshed
# https://github.com/python/typeshed/issues/10697
FloatDecoder = _scalar_decoder(cast(DecoderConstructor, decoder.FloatDecoder))
DoubleDecoder = _scalar_decoder(cast(DecoderConstructor, decoder.DoubleDecoder))
Int32Decoder = _scalar_decoder(cast(DecoderConstructor, decoder.Int32Decoder))
UInt32Decoder = _scalar_decoder(cast(DecoderConstructor, decoder.UInt32Decoder))
Int64Decoder = _scalar_decoder(cast(DecoderConstructor, decoder.Int64Decoder))
UInt64Decoder = _scalar_decoder(cast(DecoderConstructor, decoder.UInt64Decoder))
BoolDecoder = _scalar_decoder(cast(DecoderConstructor, decoder.BoolDecoder))
StringDecoder = _scalar_decoder(cast(DecoderConstructor, decoder.StringDecoder))
XYDataDecoder = _double_xy_data_decoder(_message._message_decoder_constructor, is_repeated=False)

FloatArrayDecoder = _vector_decoder(cast(DecoderConstructor, decoder.FloatDecoder))
DoubleArrayDecoder = _vector_decoder(cast(DecoderConstructor, decoder.DoubleDecoder))
Int32ArrayDecoder = _vector_decoder(cast(DecoderConstructor, decoder.Int32Decoder))
UInt32ArrayDecoder = _vector_decoder(cast(DecoderConstructor, decoder.UInt32Decoder))
Int64ArrayDecoder = _vector_decoder(cast(DecoderConstructor, decoder.Int64Decoder))
UInt64ArrayDecoder = _vector_decoder(cast(DecoderConstructor, decoder.UInt64Decoder))
BoolArrayDecoder = _vector_decoder(cast(DecoderConstructor, decoder.BoolDecoder))
StringArrayDecoder = _vector_decoder(
    cast(DecoderConstructor, decoder.StringDecoder), is_packed=False
)
XYDataArrayDecoder = _double_xy_data_decoder(
    _message._message_decoder_constructor, is_repeated=True
)


_FIELD_TYPE_TO_ENCODER_MAPPING = {
    type_pb2.Field.TYPE_FLOAT: (FloatEncoder, FloatArrayEncoder),
    type_pb2.Field.TYPE_DOUBLE: (DoubleEncoder, DoubleArrayEncoder),
    type_pb2.Field.TYPE_INT32: (IntEncoder, IntArrayEncoder),
    type_pb2.Field.TYPE_INT64: (IntEncoder, IntArrayEncoder),
    type_pb2.Field.TYPE_UINT32: (UIntEncoder, UIntArrayEncoder),
    type_pb2.Field.TYPE_UINT64: (UIntEncoder, UIntArrayEncoder),
    type_pb2.Field.TYPE_BOOL: (BoolEncoder, BoolArrayEncoder),
    type_pb2.Field.TYPE_STRING: (StringEncoder, StringArrayEncoder),
    type_pb2.Field.TYPE_ENUM: (IntEncoder, IntArrayEncoder),
    type_pb2.Field.TYPE_MESSAGE: (MessageEncoder, MessageArrayEncoder),
}

_FIELD_TYPE_TO_DECODER_MAPPING = {
    type_pb2.Field.TYPE_FLOAT: (FloatDecoder, FloatArrayDecoder),
    type_pb2.Field.TYPE_DOUBLE: (DoubleDecoder, DoubleArrayDecoder),
    type_pb2.Field.TYPE_INT32: (Int32Decoder, Int32ArrayDecoder),
    type_pb2.Field.TYPE_INT64: (Int64Decoder, Int64ArrayDecoder),
    type_pb2.Field.TYPE_UINT32: (UInt32Decoder, UInt32ArrayDecoder),
    type_pb2.Field.TYPE_UINT64: (UInt64Decoder, UInt64ArrayDecoder),
    type_pb2.Field.TYPE_BOOL: (BoolDecoder, BoolArrayDecoder),
    type_pb2.Field.TYPE_STRING: (StringDecoder, StringArrayDecoder),
    type_pb2.Field.TYPE_ENUM: (Int32Decoder, Int32ArrayDecoder),
}

_TYPE_DEFAULT_MAPPING = {
    type_pb2.Field.TYPE_FLOAT: float(),
    type_pb2.Field.TYPE_DOUBLE: float(),
    type_pb2.Field.TYPE_INT32: int(),
    type_pb2.Field.TYPE_INT64: int(),
    type_pb2.Field.TYPE_UINT32: int(),
    type_pb2.Field.TYPE_UINT64: int(),
    type_pb2.Field.TYPE_BOOL: bool(),
    type_pb2.Field.TYPE_STRING: str(),
    type_pb2.Field.TYPE_ENUM: int(),
}

_MESSAGE_TYPE_TO_DECODER = {
    xydata_pb2.DoubleXYData.DESCRIPTOR.full_name: XYDataDecoder,
}

_ARRAY_MESSAGE_TYPE_TO_DECODER = {
    xydata_pb2.DoubleXYData.DESCRIPTOR.full_name: XYDataArrayDecoder,
}


[docs] def get_encoder(type: type_pb2.Field.Kind.ValueType, repeated: bool) -> PartialEncoderConstructor: """Get the appropriate partial encoder constructor for the specified type. A scalar or vector constructor is returned based on the 'repeated' parameter. """ if type not in _FIELD_TYPE_TO_ENCODER_MAPPING: raise ValueError(f"Error can not encode type '{type}'") scalar, array = _FIELD_TYPE_TO_ENCODER_MAPPING[type] if repeated: return array return scalar
[docs] def get_decoder( type: type_pb2.Field.Kind.ValueType, repeated: bool, message_type: str = "" ) -> PartialDecoderConstructor: """Get the appropriate partial decoder constructor for the specified type.""" decoder_mapping = _FIELD_TYPE_TO_DECODER_MAPPING.get(type) if decoder_mapping is not None: scalar_decoder, array_decoder = decoder_mapping return array_decoder if repeated else scalar_decoder elif type == type_pb2.Field.Kind.TYPE_MESSAGE: if repeated: decoder = _ARRAY_MESSAGE_TYPE_TO_DECODER.get(message_type) else: decoder = _MESSAGE_TYPE_TO_DECODER.get(message_type) if decoder is None: raise ValueError(f"Unknown message type '{message_type}'") return decoder else: raise ValueError(f"Error can not decode type '{type}'")
[docs] def get_type_default(type: type_pb2.Field.Kind.ValueType, repeated: bool) -> Any: """Get the default value for the give type.""" if repeated: return list() type_default_value = _TYPE_DEFAULT_MAPPING.get(type) return type_default_value