EthNmController
- class mtf.network_port.nm_controller.EthNmController
Ethernet Network Management controller
- __init__(channel_name: str)
- get_nm_signal(signal_name: str, update_nm_signal_data=None)
Get network management signal value
- Args:
signal_name (UdpNmSignals): NM signal name update_nm_signal_data(Function): Update NM signal data
- Returns:
Union[int, List[int]]: requested signal value.
- static __new__(cls, *args, **kwargs)
Initialize a new instance of the ControllerBase.
The instance will be automatically added to the list of controllers.
- callback_(data_frame: BusEvent)
- clean_up()
- classmethod controllers_cleanup()
Clean up all controller instances.
Calls the clean_up() method for each instance.
- get_message_queue() list[dict[str, Any]]
Get messages queue :return: :rtype:
- get_queue() list[dict[str, Any]]
Get the queue
- get_timestamp_queue() list[int]
- register_callback(callback_fnc: Callable[[IEthernetProtocol], None])
- Parameters:
callback_fnc – callback function with parameter of type
MessageEthernet
# ... from andisdk import MessageEthernet # ... def eth_listener_callback(msg: MessageEthernet): # do things # ... creating listener # register our callback listener.register_callback(eth_listener_callback)
Warning
The callback function should be lightweight without logs. Otherwise, it will reduce the performance dramatically. The object that is passed to callback functions is mutable. So the user must be careful. In case of silent mode, the callback functions will be invoked only when getting the queue.
- reset() None
cleanup the queue
- start(channel=None, keep_cache=False, silent_mode: bool = False)
- start_listening(keep_cache=False, silent_mode: bool = False) bool
- stop()
- stop_listening() bool
- unregister_callback(callback_fnc: Callable[[IEthernetProtocol], None])
- Parameters:
callback_fnc – callback function with parameter of type
MessageEthernet
# ... from andisdk import MessageEthernet # ... def eth_listener_callback(msg: MessageEthernet): # do things # ... creating listener # ... do work # unregister our callback listener.unregister_callback(eth_listener_callback)
- adapt_queue_elements() Queue[BaseEvent]
ensure the object.timestamp, object.payload