# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
from ni_measurementlink_service._internal.stubs import session_pb2 as session__pb2
[docs]
class SessionUtilitiesStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.EnumerateDevices = channel.unary_unary(
'/nidevice_grpc.SessionUtilities/EnumerateDevices',
request_serializer=session__pb2.EnumerateDevicesRequest.SerializeToString,
response_deserializer=session__pb2.EnumerateDevicesResponse.FromString,
)
self.Reserve = channel.unary_unary(
'/nidevice_grpc.SessionUtilities/Reserve',
request_serializer=session__pb2.ReserveRequest.SerializeToString,
response_deserializer=session__pb2.ReserveResponse.FromString,
)
self.IsReservedByClient = channel.unary_unary(
'/nidevice_grpc.SessionUtilities/IsReservedByClient',
request_serializer=session__pb2.IsReservedByClientRequest.SerializeToString,
response_deserializer=session__pb2.IsReservedByClientResponse.FromString,
)
self.Unreserve = channel.unary_unary(
'/nidevice_grpc.SessionUtilities/Unreserve',
request_serializer=session__pb2.UnreserveRequest.SerializeToString,
response_deserializer=session__pb2.UnreserveResponse.FromString,
)
self.ResetServer = channel.unary_unary(
'/nidevice_grpc.SessionUtilities/ResetServer',
request_serializer=session__pb2.ResetServerRequest.SerializeToString,
response_deserializer=session__pb2.ResetServerResponse.FromString,
)
[docs]
class SessionUtilitiesServicer(object):
"""Missing associated documentation comment in .proto file."""
[docs]
def EnumerateDevices(self, request, context):
"""Provides a list of devices or chassis connected to server under localhost
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
[docs]
def Reserve(self, request, context):
"""Reserve a set of client defined resources for exclusive use
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
[docs]
def IsReservedByClient(self, request, context):
"""Determines if a set of client defined resources is currently reserved by a
specific client
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
[docs]
def Unreserve(self, request, context):
"""Unreserves a previously reserved resource
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
[docs]
def ResetServer(self, request, context):
"""Resets the server to a default state with no open sessions
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
[docs]
def add_SessionUtilitiesServicer_to_server(servicer, server):
rpc_method_handlers = {
'EnumerateDevices': grpc.unary_unary_rpc_method_handler(
servicer.EnumerateDevices,
request_deserializer=session__pb2.EnumerateDevicesRequest.FromString,
response_serializer=session__pb2.EnumerateDevicesResponse.SerializeToString,
),
'Reserve': grpc.unary_unary_rpc_method_handler(
servicer.Reserve,
request_deserializer=session__pb2.ReserveRequest.FromString,
response_serializer=session__pb2.ReserveResponse.SerializeToString,
),
'IsReservedByClient': grpc.unary_unary_rpc_method_handler(
servicer.IsReservedByClient,
request_deserializer=session__pb2.IsReservedByClientRequest.FromString,
response_serializer=session__pb2.IsReservedByClientResponse.SerializeToString,
),
'Unreserve': grpc.unary_unary_rpc_method_handler(
servicer.Unreserve,
request_deserializer=session__pb2.UnreserveRequest.FromString,
response_serializer=session__pb2.UnreserveResponse.SerializeToString,
),
'ResetServer': grpc.unary_unary_rpc_method_handler(
servicer.ResetServer,
request_deserializer=session__pb2.ResetServerRequest.FromString,
response_serializer=session__pb2.ResetServerResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'nidevice_grpc.SessionUtilities', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
[docs]
class SessionUtilities(object):
"""Missing associated documentation comment in .proto file."""
[docs]
@staticmethod
def EnumerateDevices(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/nidevice_grpc.SessionUtilities/EnumerateDevices',
session__pb2.EnumerateDevicesRequest.SerializeToString,
session__pb2.EnumerateDevicesResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
[docs]
@staticmethod
def Reserve(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/nidevice_grpc.SessionUtilities/Reserve',
session__pb2.ReserveRequest.SerializeToString,
session__pb2.ReserveResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
[docs]
@staticmethod
def IsReservedByClient(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/nidevice_grpc.SessionUtilities/IsReservedByClient',
session__pb2.IsReservedByClientRequest.SerializeToString,
session__pb2.IsReservedByClientResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
[docs]
@staticmethod
def Unreserve(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/nidevice_grpc.SessionUtilities/Unreserve',
session__pb2.UnreserveRequest.SerializeToString,
session__pb2.UnreserveResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
[docs]
@staticmethod
def ResetServer(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/nidevice_grpc.SessionUtilities/ResetServer',
session__pb2.ResetServerRequest.SerializeToString,
session__pb2.ResetServerResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)