Someip Controller Module
SomeipListener Module.
- class mtf.network_port.someip_controller.SomeipListener
- __init__(channel_name: str = 'ETH-SOMEIP', protocol: ~mtf.proto_common.EthProtocolType | ~mtf.libs.mtf_pybinder.mtf_eth_filter | ~mtf.libs.mtf_pybinder.mtf_someip_filter = <mtf_eth_protocol_type.SOME_IP: 2251799813685248>) None
- configure(someip_filter: ~mtf.libs.mtf_pybinder.mtf_someip_filter | None = None, listener_type: ~mtf.libs.mtf_pybinder.SomeIpListenerType = <SomeIpListenerType.GetSeparateFrames: 2>) None
- start_listening(channel=None, keep_cache=False, callback_fnc=None, someip_filter: dict | None = None, lazy_mode=None, silent_mode: bool = False) bool
- start(channel=None, keep_cache=False, callback_fnc=None, someip_filter: dict | None = None, lazy_mode=None, silent_mode: bool = False) bool
- stop_listening() bool
- reset() None
cleanup the queue
- get_someip_timestamp_queue() list[int]
- build_messages_queue() list[dict[str, Any]]
- static __new__(cls, *args, **kwargs)
Initialize a new instance of the ControllerBase.
The instance will be automatically added to the list of controllers.
- clean_up()
- classmethod controllers_cleanup()
Clean up all controller instances.
Calls the clean_up() method for each instance.
- get_message_queue() list[dict[str, Any]]
Get messages queue :return: :rtype:
- get_queue() list[dict[str, Any]]
Get the queue
- get_timestamp_queue() list[int]
- register_callback(callback_fnc: Callable[[IEthernetProtocol], None])
- Parameters:
callback_fnc – callback function with parameter of type
MessageEthernet
# ... from andisdk import MessageEthernet # ... def eth_listener_callback(msg: MessageEthernet): # do things # ... creating listener # register our callback listener.register_callback(eth_listener_callback)
Warning
The callback function should be lightweight without logs. Otherwise, it will reduce the performance dramatically. The object that is passed to callback functions is mutable. So the user must be careful. In case of silent mode, the callback functions will be invoked only when getting the queue.
- stop()
- unregister_callback(callback_fnc: Callable[[IEthernetProtocol], None])
- Parameters:
callback_fnc – callback function with parameter of type
MessageEthernet
# ... from andisdk import MessageEthernet # ... def eth_listener_callback(msg: MessageEthernet): # do things # ... creating listener # ... do work # unregister our callback listener.unregister_callback(eth_listener_callback)
- mtf.network_port.someip_controller.create_someip_filter_from_andi_dict(someip_filter: dict) mtf_someip_filter