Someip Sd Watcher Module

class mtf.network_port.someip_sd_watcher.EntryType

An enumeration.

FindService
OfferService
RequestService
RequestServiceAck
FindEventGroup
PublishEventGroup
SubscribeEventGroup
SubscribeEventGroupAck
Unknown
class mtf.network_port.someip_sd_watcher.SomeipSDWatcher

SomeipSDWatcher class

__init__(channel_name='', protocol=<mtf_eth_protocol_type.SOME_IP_SD: 4503599627370496>)

Initialization of all SomeipSDWatcher class attributes

checklist(someip_filter: {} = None)
check_filter_keys()
start_watching(sd_entry_filter: {} = None, sd_message_filter: {} = None, channel: str = None, silent_mode: bool = False)

start watching to the SOMEIP/SD bus.

Parameters:
  • sd_entry_filter (dict <keys:values>, optional) – list of attribute to filter on

  • sd_message_filter (dict<layer:filter> : filter<key:value>) – list filter message

Example:

watcher.start_watching(

sd_entry_filter={‘ttl’: 3, ‘instance_id’:0x0001}, sd_message_filter={‘ipv4_header’: {‘ip_address_source’: ‘192.168.0.1’}}

)

Returns:

True or false, executed successfully or not

start_listening(sd_entry_filter: {} = None, sd_message_filter: {} = None, channel: str = None, silent_mode: bool = False)

start watching to the SOMEIP/SD bus.

Parameters:
  • sd_entry_filter (dict <keys:values>, optional) – list of attribute to filter on

  • sd_message_filter (dict<layer:filter> : filter<key:value>) – list filter message

Example Filters
watcher.start_watching(
    sd_entry_filter={'ttl': 3, 'instance_id':0x0001},
    sd_message_filter={'ipv4_header': {'ip_address_source': '1192.168.0.1'}}
)
Returns:

True or false, executed successfully or not

stop_watching()

Stop watching to the SOMEIP/SD bus. :return: :rtype:

stop_listening()

Stop watching to the SOMEIP/SD bus. :return: :rtype:

get_messages()
get_sd_entries()
wait_for_sd_entry(entry_filter: list | None = None, timeout: int = 30, entry_counter: int = 0, remove_msg_filter: bool = True)
wait_for_someip_sd_message(message_filter: list | None = None, timeout: int = 30, message_counter: int = 0, remove_entry_filter: bool = True)
static __new__(cls, *args, **kwargs)

Initialize a new instance of the ControllerBase.

The instance will be automatically added to the list of controllers.

adapt_queue_elements() Queue[BaseEvent]

ensure the object.timestamp, object.payload

callback_(data_frame: BusEvent)
clean_up()
classmethod controllers_cleanup()

Clean up all controller instances.

Calls the clean_up() method for each instance.

get_message_queue() list[dict[str, Any]]

Get messages queue :return: :rtype:

get_queue() list[dict[str, Any]]

Get the queue

get_timestamp_queue() list[int]
register_callback(callback_fnc: Callable[[IEthernetProtocol], None])
Parameters:

callback_fnc – callback function with parameter of type MessageEthernet

Setting a callback
# ...
from andisdk import MessageEthernet
# ...

def eth_listener_callback(msg: MessageEthernet):
    # do things

# ... creating listener 
# register our callback
listener.register_callback(eth_listener_callback)

Warning

The callback function should be lightweight without logs. Otherwise, it will reduce the performance dramatically. The object that is passed to callback functions is mutable. So the user must be careful. In case of silent mode, the callback functions will be invoked only when getting the queue.

reset() None

cleanup the queue

start(channel=None, keep_cache=False, silent_mode: bool = False)
stop()
unregister_callback(callback_fnc: Callable[[IEthernetProtocol], None])
Parameters:

callback_fnc – callback function with parameter of type MessageEthernet

Remove a callback
# ...
from andisdk import MessageEthernet
# ...

def eth_listener_callback(msg: MessageEthernet):
    # do things

# ... creating listener 
# ... do work
# unregister our callback
listener.unregister_callback(eth_listener_callback)