Source code for ni_measurementlink_service._tracelogging

from __future__ import annotations

import ctypes
import sys
import uuid
from typing import Optional

try:
    import traceloggingdynamic

    _event_provider = traceloggingdynamic.Provider(b"NI-MeasurementLink-Python")
except ImportError:
    _event_provider = None

_LEVEL_LOG_ALWAYS = 0
_LEVEL_CRITICAL = 1
_LEVEL_ERROR = 3
_LEVEL_WARNING = 3
_LEVEL_INFO = 4
_LEVEL_VERBOSE = 5

_OPCODE_INFO = 0
_OPCODE_START = 1
_OPCODE_STOP = 2

_KEYWORD_NONE = 0
_KEYWORD_GRPC = 1 << 0

_TASK_GRPC_CLIENT_CALL = 1
_TASK_GRPC_SERVER_CALL = 2


if sys.platform == "win32":
    # 0x00000800 = LOAD_LIBRARY_SEARCH_SYSTEM32 (Win8 or later)
    _eventing_dll = ctypes.WinDLL("api-ms-win-eventing-provider-l1-1-0.dll", mode=0x00000800)

    _EventActivityIdControl = _eventing_dll.EventActivityIdControl
    _EventActivityIdControl.restype = ctypes.c_uint32
    _EventActivityIdControl.argtypes = (ctypes.c_uint32, ctypes.c_void_p)

    _EVENT_ACTIVITY_CTRL_GET_ID = 1
    _EVENT_ACTIVITY_CTRL_SET_ID = 2
    _EVENT_ACTIVITY_CTRL_CREATE_ID = 3
    _EVENT_ACTIVITY_CTRL_GET_SET_ID = 4
    _EVENT_ACTIVITY_CTRL_CREATE_SET_ID = 5

    def _create_activity_id() -> uuid.UUID:
        activity_bytes = (ctypes.c_byte * 16)()
        status = _EventActivityIdControl(
            _EVENT_ACTIVITY_CTRL_CREATE_ID, ctypes.pointer(activity_bytes)
        )
        if status != 0:
            raise OSError("EventActivityIdControl error", status)
        return uuid.UUID(bytes_le=bytes(activity_bytes))

    def _get_current_thread_activity_id() -> uuid.UUID:
        activity_bytes = (ctypes.c_byte * 16)()
        status = _EventActivityIdControl(
            _EVENT_ACTIVITY_CTRL_GET_ID, ctypes.pointer(activity_bytes)
        )
        if status != 0:
            raise OSError("EventActivityIdControl error", status)
        return uuid.UUID(bytes_le=bytes(activity_bytes))

else:

    def _create_activity_id() -> uuid.UUID:
        return uuid.uuid4()

    def _get_current_thread_activity_id() -> uuid.UUID:
        return uuid.UUID()


[docs] def is_enabled() -> bool: """Queries whether the event provider is enabled.""" return _event_provider and _event_provider.is_enabled()
[docs] def log_grpc_client_call_start(method_name: str) -> Optional[uuid.UUID]: """Log when starting a gRPC client call.""" if _event_provider and _event_provider.is_enabled(level=_LEVEL_INFO, keyword=_KEYWORD_GRPC): eb = traceloggingdynamic.EventBuilder() eb.reset( b"GrpcClientCall", level=_LEVEL_INFO, keyword=_KEYWORD_GRPC, opcode=_OPCODE_START, task=_TASK_GRPC_CLIENT_CALL, ) eb.add_str8(b"FormattedMessage", "gRPC client call starting: " + method_name) activity_id = _create_activity_id() related_activity_id = _get_current_thread_activity_id() _event_provider.write(eb, activity_id, related_activity_id) return activity_id else: return None
[docs] def log_grpc_client_call_stop(method_name: str, activity_id: Optional[uuid.UUID] = None) -> None: """Log when a gRPC client call has completed.""" if _event_provider and _event_provider.is_enabled(level=_LEVEL_INFO, keyword=_KEYWORD_GRPC): eb = traceloggingdynamic.EventBuilder() eb.reset( b"GrpcClientCall", level=_LEVEL_INFO, keyword=_KEYWORD_GRPC, opcode=_OPCODE_STOP, task=_TASK_GRPC_CLIENT_CALL, ) eb.add_str8(b"FormattedMessage", "gRPC client call complete: " + method_name) _event_provider.write(eb, activity_id)
[docs] def log_grpc_client_call_streaming_request(method_name: str) -> None: """Log when a gRPC client call is sending a client-streaming request.""" if _event_provider and _event_provider.is_enabled(level=_LEVEL_INFO, keyword=_KEYWORD_GRPC): eb = traceloggingdynamic.EventBuilder() eb.reset( b"GrpcClientCallStreamingRequest", level=_LEVEL_INFO, keyword=_KEYWORD_GRPC, opcode=_OPCODE_INFO, ) eb.add_str8(b"FormattedMessage", "gRPC client call streaming request: " + method_name) _event_provider.write(eb)
[docs] def log_grpc_client_call_streaming_response(method_name: str) -> None: """Log when a gRPC client call has received a server-streaming response.""" if _event_provider and _event_provider.is_enabled(level=_LEVEL_INFO, keyword=_KEYWORD_GRPC): eb = traceloggingdynamic.EventBuilder() eb.reset( b"GrpcClientCallStreamingResponse", level=_LEVEL_INFO, keyword=_KEYWORD_GRPC, opcode=_OPCODE_INFO, ) eb.add_str8(b"FormattedMessage", "gRPC client call streaming response: " + method_name) _event_provider.write(eb)
[docs] def log_grpc_server_call_start(method_name: str) -> Optional[uuid.UUID]: """Log when starting a gRPC server call.""" if _event_provider and _event_provider.is_enabled(level=_LEVEL_INFO, keyword=_KEYWORD_GRPC): eb = traceloggingdynamic.EventBuilder() eb.reset( b"GrpcServerCall", level=_LEVEL_INFO, keyword=_KEYWORD_GRPC, opcode=_OPCODE_START, task=_TASK_GRPC_SERVER_CALL, ) eb.add_str8(b"FormattedMessage", "gRPC server call starting: " + method_name) activity_id = _create_activity_id() related_activity_id = _get_current_thread_activity_id() _event_provider.write(eb, activity_id, related_activity_id) return activity_id else: return None
[docs] def log_grpc_server_call_stop(method_name: str, activity_id: Optional[uuid.UUID] = None) -> None: """Log when a gRPC server call has completed.""" if _event_provider and _event_provider.is_enabled(level=_LEVEL_INFO, keyword=_KEYWORD_GRPC): eb = traceloggingdynamic.EventBuilder() eb.reset( b"GrpcServerCall", level=_LEVEL_INFO, keyword=_KEYWORD_GRPC, opcode=_OPCODE_STOP, task=_TASK_GRPC_SERVER_CALL, ) eb.add_str8(b"FormattedMessage", "gRPC server call complete: " + method_name) _event_provider.write(eb, activity_id)
[docs] def log_grpc_server_call_streaming_request(method_name: str) -> None: """Log when a gRPC server call is sending a server-streaming request.""" if _event_provider and _event_provider.is_enabled(level=_LEVEL_INFO, keyword=_KEYWORD_GRPC): eb = traceloggingdynamic.EventBuilder() eb.reset( b"GrpcServerCallStreamingRequest", level=_LEVEL_INFO, keyword=_KEYWORD_GRPC, opcode=_OPCODE_INFO, ) eb.add_str8(b"FormattedMessage", "gRPC server call streaming request: " + method_name) _event_provider.write(eb)
[docs] def log_grpc_server_call_streaming_response(method_name: str) -> None: """Log when a gRPC server call has received a server-streaming response.""" if _event_provider and _event_provider.is_enabled(level=_LEVEL_INFO, keyword=_KEYWORD_GRPC): eb = traceloggingdynamic.EventBuilder() eb.reset( b"GrpcServerCallStreamingResponse", level=_LEVEL_INFO, keyword=_KEYWORD_GRPC, opcode=_OPCODE_INFO, ) eb.add_str8(b"FormattedMessage", "gRPC server call streaming response: " + method_name) _event_provider.write(eb)