EthNmController
- class mtf.network_port.nm_controller.EthNmController
Ethernet Network Management controller
- __init__(channel_name: str)
- get_nm_signal(signal_name: str, update_nm_signal_data=None)
Get network management signal value
- Args:
signal_name (UdpNmSignals): NM signal name update_nm_signal_data(Function): Update NM signal data
- Returns:
Union[int, List[int]]: requested signal value.
- static __new__(cls, *args, **kwargs)
Initialize a new instance of the ControllerBase.
The instance will be automatically added to the list of controllers.
- callback_(data_frame: BusEvent)
used as a callback function when an event or frame is received on the communication bus Args:
data_frame (BusEvent): The data frame
- clean_up()
- classmethod controllers_cleanup()
Clean up all controller instances.
Calls the clean_up() method for each instance.
- get_message_queue() list[dict[str, Any]]
Get messages queue :return: :rtype:
- get_queue() list[dict[str, Any]]
Return the queue containing the events or messages related to the communication bus Return:
Queue[BusEvent]
- get_timestamp_queue() list[int]
- register_callback(callback_fnc: Callable[[IEthernetProtocol], None])
- Parameters:
callback_fnc – callback function with parameter of type
MessageEthernet
# ... from andisdk import MessageEthernet # ... def eth_listener_callback(msg: MessageEthernet): # do things # ... creating listener # register our callback listener.register_callback(eth_listener_callback)
Warning
The callback function should be lightweight without logs. Otherwise, it will reduce the performance dramatically. The object that is passed to callback functions is mutable. So the user must be careful. In case of silent mode, the callback functions will be invoked only when getting the queue.
- reset() None
Reset the state of the bus listener by clearing its history and emptying the event queue
- start(channel=None, keep_cache=False, silent_mode: bool = False)
- start_listening(keep_cache=False, silent_mode: bool = False) bool
- stop()
- stop_listening() bool
Stop listening for events on a communication bus Return:
bool
- unregister_callback(callback_fnc: Callable[[IEthernetProtocol], None])
- Parameters:
callback_fnc – callback function with parameter of type
MessageEthernet
# ... from andisdk import MessageEthernet # ... def eth_listener_callback(msg: MessageEthernet): # do things # ... creating listener # ... do work # unregister our callback listener.unregister_callback(eth_listener_callback)
- adapt_queue_elements() Queue[BaseEvent]
ensure the object.timestamp, object.payload