EthNmController

class mtf.network_port.nm_controller.EthNmController

Ethernet Network Management controller

__init__(channel_name: str)
get_nm_signal(signal_name: str, update_nm_signal_data=None)

Get network management signal value

Args:

signal_name (UdpNmSignals): NM signal name update_nm_signal_data(Function): Update NM signal data

Returns:

Union[int, List[int]]: requested signal value.

static __new__(cls, *args, **kwargs)

Initialize a new instance of the ControllerBase.

The instance will be automatically added to the list of controllers.

callback_(data_frame: BusEvent)
clean_up()
classmethod controllers_cleanup()

Clean up all controller instances.

Calls the clean_up() method for each instance.

get_message_queue() list[dict[str, Any]]

Get messages queue :return: :rtype:

get_queue() list[dict[str, Any]]

Get the queue

get_timestamp_queue() list[int]
register_callback(callback_fnc: Callable[[IEthernetProtocol], None])
Parameters:

callback_fnc – callback function with parameter of type MessageEthernet

Setting a callback
# ...
from andisdk import MessageEthernet
# ...

def eth_listener_callback(msg: MessageEthernet):
    # do things

# ... creating listener 
# register our callback
listener.register_callback(eth_listener_callback)

Warning

The callback function should be lightweight without logs. Otherwise, it will reduce the performance dramatically. The object that is passed to callback functions is mutable. So the user must be careful. In case of silent mode, the callback functions will be invoked only when getting the queue.

reset() None

cleanup the queue

start(channel=None, keep_cache=False, silent_mode: bool = False)
start_listening(keep_cache=False, silent_mode: bool = False) bool
stop()
stop_listening() bool
unregister_callback(callback_fnc: Callable[[IEthernetProtocol], None])
Parameters:

callback_fnc – callback function with parameter of type MessageEthernet

Remove a callback
# ...
from andisdk import MessageEthernet
# ...

def eth_listener_callback(msg: MessageEthernet):
    # do things

# ... creating listener 
# ... do work
# unregister our callback
listener.unregister_callback(eth_listener_callback)
adapt_queue_elements() Queue[BaseEvent]

ensure the object.timestamp, object.payload