Test Features Controller Module

class mtf.test_features_sim.test_features_controller.TestFeaturesSim
__init__()
configure_simulation_controller(conf_param_value: str, config_schema_path: str, start_sim=True)

Configure simulation controller and start the simulations if start_sim is True.

Parameters:
  • conf_param_value – path to config file (env.conf file)

  • config_schema_path – the path to settings file (settings.json file)

  • start_sim – False: configure controllers only, without starting simulations. Defaults to True: configure controllers and start simulations.

Returns:

True if the simulation is configured. False otherwise.

Examples: >>> tfs_instance = TestFeaturesSim() # Create an instance of TestFeaturesSim >>> tfs_instance.configure_simulation_controller(“path/to/env.conf”, “path/to/settings.json”, start_sim=True)

deconfigure_simulation_controller()

Deconfigure simulation controller Examples:

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> tfs_instance.deconfigure_simulation_controller()
check_simulations_status()

Check simulation status :returns: True if all simulations are started. False otherwise. Examples:

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> tfs_instance.check_simulations_status()
get_all_simulations_name()

Retrieve all simulation names of ECUs from the CarModelSpecification.json file. :returns: list of simulation names of simulated ECUs. Examples:

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> result = tfs_instance.get_all_simulations_name()
send_diag_message(src: int, tar: int, payload: List) bool

send diag message over can

Parameters:
  • src – source

  • tar – target

  • payload – payload to be sent (list[int])

Returns:

True is CAN message was sent else False

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> src = 123
>>> tar = 456
>>> payload = [1, 2, 3, 4, 5]
>>> result = tfs_instance.send_diag_message(src, tar, payload)
configure_frame_buffer_max_size(max_size: int)

Set a maximum size for all frame buffers equal to max_size

param max_size:

maximum size

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> max_size = 1000  # Replace with the desired maximum size
>>> tfs_instance.configure_frame_buffer_max_size(max_size)

# Set a maximum size for all frame buffers equal to the specified max_size.

start_message_observer(message_path: str, on_change: bool = True, include_empty_frames: bool = False)

Start a listener to monitor a specific message_path, values received of this message could be retrieved when the listener is stopped.

Parameters:
  • message_path – BusName::msgId

  • on_change – True to get all received values, else false. Defaults to True.

  • include_empty_frames – True to include frames with empty payload.

Returns:

True if message observer is started. False otherwise.

Example:
>>> result = TestFeaturesSim().start_message_observer("Channel::0x123", True, False)
stop_message_observer(message_path: str, get_values: bool)

Stop a started listener message_path, values received of this message could be retrieved if get_values is True.

Parameters:
  • message_path – BusName::msgId

  • get_values – True to get retrieved values. False otherwise (get empty list)

Returns:

list contains (timestamp, payload) pairs

Examples:
>>> values = TestFeaturesSim().stop_message_observer("Channel::0x123", False)
check_cyclic_timeout(message_path: str, cycle: int, cycle_deviation: int, timeout: int)
Check whether a message within message_path is cyclic or not for a specific timeout with a cycle equal to cycle

and with tolerance equal to cycle_deviation on ms.

Note: Make sure a listener was already started for the target message.

Parameters:
  • message_path – BusName::MsgId

  • cycle – cycle of msg from NK

  • cycle_deviation – tolerance

  • timeout – delay to check the cyclicity

Returns:

True if the cycle was respected during the timeout. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.check_cyclic_timeout("Channel::0x123", 200, 20, 1000)
check_multiple_cyclic_messages_with_timeout(message_path: list[str], cycle: list[int], cycle_deviation: list[int], timeout: int)
Check whether a message within message_path is cyclic or not for a specific timeout with a cycle equal to cycle

and with tolerance equal to cycle_deviation on ms.

Note: Make sure a listener was already started for the target message.

Parameters:
  • message_path – List of strings that contains :BusName::MsgId

  • cycle – cycle of msg from NK

  • cycle_deviation – tolerance

  • timeout – delay to check the cyclicity

Returns:

True if the cycle was respected during the timeout. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.check_multiple_cyclic_messages_with_timeout(["Channel::0x123"], [200], [20], 1000)
check_cyclic_repetition(message_path: str, cycle: int, cycle_deviation: int, nbr_repetition: int, check_latest_values: bool = True)

Cyclic Repetition Check

Checks whether a message within message_path is cyclic or not for a specific nbr_repetition with a cycle equal to cycle and with tolerance equal to cycle_deviation.

Note: Make sure a listener was already started for the target message.

Parameters:
  • message_path – BusName::MsgId

  • cycle – cycle of msg from NK

  • cycle_deviation – tolerance

  • nbr_repetition – number of occurrence

  • check_latest_values – check the last nbr_repetition times messages or check all the time intervals. Defaults to True.

Returns:

True if the cycle was respected during the timeout. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.check_cyclic_repetition("Channel::0x123", 200, 20, 3)
check_repetition(message_path: str, is_received: bool)

Check the reception or the non-reception of message_path based on is_received.

Note: Make sure a listener was already started for the target message

Parameters:
  • message_path – BusName::MsgId

  • is_received – ${True} to check the message was received else ${False}

Returns:

True in case of ‘’is_received’’ is True and the message was received or ‘’is_received’’ is False and the message was not received. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.check_repetition("Channel::0x123", True)
check_reception(message_path: str, is_received: bool)

Check the reception or the non-reception of message_path based on is_received.

Note: Make sure a listener was already started for the target message

Parameters:
  • message_path – BusName::MsgId

  • is_received – ${True} to check the message was received else ${False}

Returns:

True in case of ‘’is_received’’ is True and the message was received or ‘’is_received’’ is False and the message was not received. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.check_reception("Channel::0x123", True)
check_payload_byte(message_path: str, byte_indexes: List, bytes_values: List, bytes_mask: List, operation: LogicalOperation = LogicalOperation.ALL)

Verify that the bytes with indexes byte_indexes in message_path have the values bytes_values using bytes_mask.

Parameters:
  • message_path – BusName::MsgId

  • byte_indexes – byte indexes

  • bytes_values – bytes values

  • bytes_mask – bytes mask Note: The length of byte_indexes, bytes_values and bytes_mask should be the same

  • operation – Defaults to LogicalOperation.ALL.

Returns:

True if bytes with indexes are verified. False otherwise.

Examples:
>>> from mtf.enum_types import LogicalOperation
>>> tfs_instance = TestFeaturesSim()
>>> message_id = "BusName::MsgId"
>>> expected_payload = [0, 1, 2, 3, 4, 5, 6, 7]
>>> mask = [0x1, 0x2, 0x3, 0x4, 0xF4, 0xF6, 0x05, 0x01]
>>> received_payload = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]
>>> offset = LogicalOperation.ALL
>>> result = tfs_instance.check_payload_byte(message_id, expected_payload, mask, received_payload, offset)
clear_message_buffer(message_path: str)

Clear message buffer of message_path.

Parameters:

message_path – BusName::MsgId

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.clear_message_buffer("BusName::MsgId")
start_signal_observer(signal_path: str, on_change: bool = True, is_physical: bool = False)

Start a listener to specific signal_path, values received of this signal could be retrieved when the listener is stopped.

Parameters:
  • signal_path – BusName::MsgId::SignalName

  • on_change – true if we need to save the different values received. Otherwise, False even if we receive the same value. Defaults to True.

  • is_physical – Defaults to False.

Returns:

True if signal observer was started else False

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> tfs_instance.start_signal_observer("bus_name::0x123::signal_name")
stop_signal_observer(signal_path: str, get_values: bool)

Stop the started listener for signal_path. If ‘’get_values’’ parameter is True, the recorded values are returned.

Parameters:
  • signal_path – BusName::MsgId::SignalName

  • get_values – True to get all received values, else false

Returns:

list contains (timestamp, signal value) pairs.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> tfs_instance.stop_signal_observer("BusName::MsgId::SignalName", True)
checkSignal(signal_path: str, signal_value: int, comparison: ~mtf.libs.mtf_pybinder.CompareOperation = <CompareOperation.EQ: 0>, logical_operation: ~mtf.enum_types.LogicalOperation = LogicalOperation.ALL, mask: int = 18446744073709551615)

compare last received signal value with the expected one signal_value.

Note: Make sure a listener was already started for the target message

Parameters:
  • signal_path – BusName::MsgId::SignalName

  • signal_value – signal value

  • comparison – Defaults to CompareOperation.EQ.

  • logical_operation – Defaults to LogicalOperation.ALL.

  • mask – Defaults to 0xFFFFFFFFFFFFFFFF.

Returns:

True if last received value of signal_path is CompareOperation to signal_value. False otherwise.

Examples:
>>> from mtf.enum_types import CompareOperation
>>> tfs_instance = TestFeaturesSim()
>>> signal_path = "BusName::MsgId::SignalName"
>>> expected_value = 5
>>> tolerance = CompareOperation.EQ
>>> result = tfs_instance.checkSignal(signal_path, expected_value, tolerance)
monitor_always_signal(signal_path: str, signal_value: int, timeout: int, comparison: ~mtf.libs.mtf_pybinder.CompareOperation = <CompareOperation.EQ: 0>, mask: int = 18446744073709551615)

Monitor a signal during a timeout and asserts that the signal value is equal to the given value.

Parameters:
  • signal_path – BusName::MsgId::SignalName

  • signal_value – signal value

  • timeout – period of time defined as parameter on ms.

  • comparison – Defaults to CompareOperation.EQ.

  • mask – Defaults to 0xFFFFFFFFFFFFFFFF.

Returns:

True if the expected value has been received for the given path before the timeout is elapsed. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> from mtf.enum_types import CompareOperation
>>> signal_path = "BusName::MsgId::SignalName"
>>> signal_value = 5
>>> timeout = 300
>>> comparison = CompareOperation.EQ
>>> mask = "0xFFFFFFFFFFFFFFFF"
>>> result = tfs_instance.monitor_always_signal(signal_path, signal_value, timeout, comparison, mask)
monitor_happened_signal(signal_path: str, signal_value: int, timeout: int, comparison: ~mtf.libs.mtf_pybinder.CompareOperation = <CompareOperation.EQ: 0>, mask: int = 18446744073709551615)

Monitor a signal during a timeout.

Note: make sure a listener was already started.

Info: Unlike monitor_always_signal, this function returns once the signal is found, even before the timeout.

Parameters:
  • signal_path – BusName::MsgId::SignalName

  • signal_value – signal_value

  • timeout – a period of time defined in ms.

  • comparison – Defaults to CompareOperation.EQ.

  • mask – Defaults to 0xFFFFFFFFFFFFFFFF.

Returns:

True if the expected value has been received before the timeout is elapsed. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> signal_path = "BusName::MsgId::SignalName"
>>> expected_value = 5
>>> timeout = 500
>>> comparison = CompareOperation.EQ
>>> mask = 0xFFFFFFFFFFFFFFFF
>>> tfs_instance.start_message_observer(signal_path)
>>> result = tfs_instance.monitor_happened_signal(signal_path, expected_value, timeout, comparison, mask)
>>> tfs_instance.stop_signal_observer(signal_path, True)
>>> assert result == expected_value
monitor_always_signals_mixed(signals_paths: list[str], types: list[int], signals_value: list[str], timeout: int, comparison: list[int], masks: list[int])

check mixed signals values with monitor always

Parameters:
  • signals_paths – List of signal paths. Each signal path must be in BusName::MsgId::SignalName format.

  • types – List of signal types. 1 for Ethernet (someip or npdu) and 0 for Legacy signals.

  • signals_value – List of expected values for the signals.

  • timeout – Timeout in milliseconds for signals monitoring.

  • comparison – List of comparison operations, where each operation corresponds to a signal.

  • masks – List of masks to be applied to signals to mask out irrelevant bits. Default is no mask for all signals.

monitor_happened_signals_mixed(signals_paths: list[str], types: list[int], signals_value: list[str], timeout: int, comparison: list[int], masks: list[int])

check mixed signals values with monitor happened

Parameters:
  • signals_paths – List of signal paths. Each signal path must be in BusName::MsgId::SignalName format.

  • types – List of signal types. 1 for Ethernet (someip or npdu) and 0 for Legacy signals.

  • signals_value – List of expected values for the signals.

  • timeout – Timeout in milliseconds for signals monitoring.

  • comparison – List of comparison operations, where each operation corresponds to a signal.

  • masks – List of masks to be applied to signals to mask out irrelevant bits. Default is no mask for all signals.

monitor_always_signals(signals_paths: list[str], signals_values: list[int], timeout: int, masks: list[int], comparison: list[CompareOperation] | list[int])

Monitor signals paths during a timeout and asserts that the signals values is equal to the given values. Timeout is in milliseconds.

Parameters:
  • signals_paths – List of signal paths. Each signal path must be in BusName::MsgId::SignalName format.

  • signals_values – List of expected values for the signals.

  • timeout – Timeout in milliseconds for signals monitoring.

  • masks – List of masks to be applied to signals to mask out irrelevant bits.

  • comparison – List of comparison operations, where each operation corresponds to a signal.

Returns:

True if received values equals to expected values.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> signal_paths = ["BusName::MsgId::SignalName", "BusName2::MsgId2::SignalName2", "BusName3::MsgId3::SignalName3"]
>>> signals_values = [5, 5, 5]
>>> timeout = 1000
>>> masks = [0xFFFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFF]
>>> comparison = [0, 0, 0]
>>> result = tfs_instance.monitor_always_signals(signal_paths, signals_values, timeout, masks, comparison)
monitor_happened_signals(signals_paths: list[str], signals_values: list[int], timeout: int, masks: list[int], comparison: list[CompareOperation] | list[int])

Check if one of received value of each signal_path verifies the comparison relation with the provided signal_values before the timeout elapsed

Note: Make sure a listener was already started for each signal_path

Parameters:
  • signals_paths – List of signal paths. Each signal path must be in BusName::MsgId::SignalName format.

  • signals_values – List of expected values for the signals.

  • timeout – Timeout in milliseconds for signals monitoring.

  • masks – List of masks to be applied to signals to mask out irrelevant bits.

  • comparison – List of comparison operations, where each operation corresponds to a signal.

Returns:

True if, before the timeout expires, received values equals to expected values.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> signal_list = ["BusName::MsgId::SignalName", "BusName2::MsgId2::SignalName2", "BusName3::MsgId3::SignalName3"]
>>> expected_values = [10, 10, 10]
>>> timeout = 1000
>>> masks = [0xFFFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFF]
>>> comparison = [0, 0, 0]
>>> result = tfs_instance.monitor_happened_signals(signal_list, expected_values, timeout, masks, comparison)
stop_signals_observers(signals_paths: list[str])

Stop signal observers of each signal_path.

Note: Make sure a listener was already started for each signal_path

Parameters:

signals_paths – List of signal paths. Each signal path must be in BusName::MsgId::SignalName format.

Returns:

True if all observers has been stopped. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> signal_list = ["BusName::MsgId::SignalName", "BusName2::MsgId2::SignalName2", "BusName3::MsgId3::SignalName3"]
>>> result = tfs_instance.stop_signals_observers(signal_list)
clear_signal_buffer(signal_path: str)

Clear the signal buffer of signal_path.

Parameters:

signal_path – BusName::MsgId::SignalName

Returns:

True if signal buffer was cleared. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> signal_path = "BusName::MsgId::SignalName"  # Replace with the specific signal path
>>> result = tfs_instance.clear_signal_buffer(signal_path)
get_received_signal_values_timeout(signal_path: str, timeout: int)

Return a list of received values of signal_path within the specified timeout.

Note: Make sure a listener was already started for each signal_path.

Parameters:
  • signal_path – BusName::MsgId::SignalName

  • timeout – Timeout in milliseconds

Returns:

List of (timestamp, value).

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> signal_path = "BusName::MsgId::SignalName"  # Replace with the specific signal path
>>> timeout = 500  # Replace with the desired timeout in milliseconds
>>> received_values = tfs_instance.get_received_signal_values_timeout(signal_path, timeout)
get_latest_received_signal_values(signal_path: str, buffer_size: int)

Return a list of last buffer_size received values of signal_path.

Note: Make sure a listener was already started for each signal_path.

Parameters:
  • signal_path – BusName::MsgId::SignalName

  • buffer_size – buffer size

Returns:

List of (timestamp, value).

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> signal_path = "BusName::MsgId::SignalName"  # Replace with the specific signal path
>>> buffer_size = 10  # Replace with the desired buffer size
>>> latest_values = tfs_instance.get_latest_received_signal_values(signal_path, buffer_size)
# Retrieve a list of the last 'buffer_size' received values of signal_path.
configure_signal_buffer_max_size(max_buffer_size: int)

Set a maximum size for all signals buffers equal to max_buffer_size.

Parameters:

max_buffer_size – max buffer size to be set

Returns:

True if the buffer size was correctly configured.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> max_buffer_size = 1000  # Replace with the desired maximum buffer size
>>> result = tfs_instance.configure_signal_buffer_max_size(max_buffer_size)
# Set a maximum size for all signals buffers equal to the specified max_buffer_size.
set_trigger_then_wait_reaction(signals_paths_to_set: list[str], values_to_set: list[str], timeout_to_set: int, signals_paths_to_check: list[str], values_to_check: list[str], timeout_to_check: int, compare_operations: list[int], masks: list[int])

” Set network elements (triggers) over an Ethernet Bus/Legacy Bus. Once all the sets are Valid , the keyword starts checking the reception of the network elements (reaction) over an EthernetBus/LegacyBus/IoChannel . The keyword succeeds if all the triggers Set , and the reactions check conditions are met , Otherwise it fails . TODO: support IOChannels setting

Parameters:
  • signals_paths_to_set – List of signals/ethernet paths to set .

  • values_to_set – List of values to set , could be a value or a couple of (values,update_and_send) like [“1”] or [“1,true”] or [“1,false”] .

  • timeout_to_set – Maximum accepted delay for the signal to be set on hardware (set is confirmed) in ms.

  • signals_paths_to_check – List of signals/ethernet/IOChannels paths to check.

  • values_to_check – List of signals/ethernet/IOChannels values to check.

  • timeout_to_check – Timeout to monitor the response of the DUT in ms.

  • compare_operations – List of compare operations between the received values and the values to check (default to equality operations).

  • masks – applied mask on recieved value to check (default to a list of 0xFFFFFFFFFFFFFFFF).

Returns:

True if all the sets are confirmed and all the checks are satisfied , Otherwise False.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> tfs_instance.set_trigger_then_wait_reaction([BusName::MessageID::PduName,"EcuName::ServiceID::InstanceID::messageID::MemberName"], ["1,false","2,true"],1500,
    ["HW::IOChannelName::attributeName","BusName::FrameId::SignalnName"],["10.1,2"],1500,[1,0],[0xFFFFFFFFFFFFFFFF,0xFFFFFFFFFFFFFFFF])
start_io_channel_observer(signal_path: str, attribute_name: str)

Start a listener to specific io channel, the values received could be retrieved when the listener is stopped.

Parameters:
  • signal_path – HW::channel_name

  • attribute_name – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

Returns:

True if signal observer was started.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> tfs_instance.start_io_channel_observer("HW::channel_name", "Voltage")
stop_io_channel_observer(signal_path: str, attribute_name: str, get_values: bool)

Stop the started io listener for signal_path, values received of this signal could be retrieved if get_values is True.

Parameters:
  • signal_path – HW::HwSignalName

  • attribute_name – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

  • get_values – True to get retrieved values. False otherwise (get empty list)

Returns:

True if io signal observer is stopped. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> tfs_instance.stop_io_channel_observer("HW::HwSignalName", True)
check_io_channel(signal_path: str, attribute_name: str, signal_value: ~numpy.float64, comparison: ~mtf.libs.mtf_pybinder.CompareOperation = <CompareOperation.EQ: 0>, logical_operation: ~mtf.enum_types.LogicalOperation = LogicalOperation.ALL, mask: int = 18446744073709551615)

Check an io signal with specific type attribute_name is received as expected or not by making the needed comparison with signal_value.

Parameters:
  • signal_path – HW::HwSignalName

  • attribute_name – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

  • signal_value – expected hw value

  • comparison – Defaults to CompareOperation.EQ.

  • logical_operation – Defaults to LogicalOperation.ALL.

  • mask – Defaults to 0xFFFFFFFFFFFFFFFF.

Returns:

True if the last received value comparison to expected value signal_value. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> from mtf.enum_types import CompareOperation, LogicalOperation
>>> io_signal_path = "HW::HwSignalName"
>>> parameter_name = "Voltage"
>>> expected_value = 8.0
>>> compar = CompareOperation.EQ
>>> operation = LogicalOperation.ALL
>>> result = tfs_instance.check_io_channel(io_signal_path, parameter_name, expected_value, compar, operation)
monitor_always_io_channel(signal_path: str, attribute_name: str, signal_value: ~numpy.float64, timeout: int, comparison: ~mtf.libs.mtf_pybinder.CompareOperation = <CompareOperation.EQ: 0>, mask: int = 18446744073709551615)

Check all received values for signal_path during a timeout verifies the comparison relation with the provided signal_value.

Note: Make sure a listener was already started for the target io signal

Parameters:
  • signal_path – HW::SignalHwName

  • attribute_name – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

  • signal_value – signal value

  • timeout – Timeout in milliseconds

  • comparison – Defaults to CompareOperation.EQ.

  • mask – Defaults to 0xFFFFFFFFFFFFFFFF.

Returns:

True if the received value verifies the comparison with the expected value signal_value during the timeout. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> from mtf.enum_types import CompareOperation
>>> io_signal_path = "HW::SignalHwName"
>>> parameter_name = "Voltage"
>>> expected_value = 8
>>> timeout = 2000
>>> comparison = CompareOperation.EQ
>>> mask = 0xFFFFFFFFFFFFFFFF
>>> result = tfs_instance.monitor_always_io_channel(io_signal_path, parameter_name, expected_value, timeout, comparison, mask)
monitor_happened_io_channel(signal_path: str, attribute_name: str, signal_value: ~numpy.float64, timeout: int, comparison: ~mtf.libs.mtf_pybinder.CompareOperation = <CompareOperation.EQ: 0>, mask: int = 18446744073709551615)

Check if one received value of signal_path verifies the comparison relation with the provided signal_value before the timeout elapsed

Note: Make sure a listener was already started for the signal_path

Info: Unlike monitor_always_io_channel, this function returns once the signal value is received, even before the timeout.

Parameters:
  • signal_path – HW::SignalHwName

  • attribute_name – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

  • signal_value – signal value

  • timeout – Timeout in milliseconds

  • comparison – Defaults to CompareOperation.EQ.

  • mask – Defaults to 0xFFFFFFFFFFFFFFFF.

Returns:

True if before the timeout elapsed, there is a received io value that verifies the expected one. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> io_signal_path = "HW::SignalHwName"
>>> parameter_name = "Voltage"
>>> expected_value = 11.0
>>> timeout = 2000
>>> result = tfs_instance.monitor_happened_io_channel(io_signal_path, parameter_name, expected_value, timeout)
clear_io_channel_buffer(signal_path: str, attribute_name: str)

Clear the buffer of signal_path.

Parameters:
  • signal_path – HW::SignalHwName

  • attribute_name – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

Returns:

True if io channel buffer was cleared. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> signal_path = "HW::SignalHwName"  # Replace with the specific signal path
>>> attribute_name = "Voltage"  # Replace with the attribute name
>>> result = tfs_instance.clear_io_channel_buffer(signal_path, attribute_name)
# Clear the buffer of the specified signal_path for the given attribute_name.
monitor_always_io_channels(channels_paths: List[str] | None = None, attributes_names: List[str] | None = None, signal_values: List[float64] | None = None, timeout: int = 0, comparison_signs: List[CompareOperation] | None = None)

Check all received io values for channels_paths during a timeout verifies the comparison relation with the provided signal_value Note: Make sure a listener was already started for each channel_path.

Parameters:
  • channels_paths – List of channels paths. Each one must be in the HW::HwSignalName format.

  • attributes_names – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

  • signal_values – List of signal values.

  • timeout – Timeout in milliseconds

  • comparison_signs – List of comparison sign

Returns:

True if each channels_path verifies the comparison with its corresponding expected value signal_value during the timeout, Otherwise False.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> io_list = ["HW::HwSignalName", "HW::HwSignalName2"]
>>> io_types = ["Voltage", "Voltage"]
>>> expected_values = [9.0, 9.0]
>>> timeout = 1000
>>> comparison = [2, 2]
>>> result = tfs_instance.monitor_always_io_channels(io_list, io_types, expected_values, timeout, comparison)
monitor_happened_io_channels(channels_paths: List[str] | None = None, attributes_names: List[str] | None = None, signal_values: List[float64] | None = None, timeout: int = 0, comparison_signs: List[CompareOperation] | None = None)

Check if one received io value of channels_paths verifies the comparison_signs relation with the provided signal_values before the timeout elapsed

Note: Make sure a listener was already started for each channel_path.

Info: Unlike monitor_always_io_channels, this function returns once the signal values are received, even before the timeout.

Parameters:
  • channels_paths – List of channels paths. Each one must be in the HW::HwSignalName format.

  • attributes_names – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

  • signal_values – List of signal values.

  • timeout – Timeout in milliseconds.

  • comparison_signs

Returns:

True if each channels_path verifies the comparison with its corresponding expected value signal_value during the timeout, Otherwise False.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> io_signal_path = ["HW::HwSignalName","HW::HwSignalName2"]
>>> parameter_name = ["Voltage","Voltage"]
>>> expected_value = [11.0, 5.0]
>>> timeout = 2000
>>> result = tfs_instance.monitor_happened_io_channels(io_signal_path, parameter_name, expected_value, timeout)
stop_io_channel_observers(channels_paths: List[str] | None = None, attributes_names: List[str] | None = None)

Stop the started listeners for each channel path.

Parameters:
  • channels_paths – List of channels paths. Each one must be in the HW::HwSignalName format.

  • attributes_names – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

Returns:

True if all the channels_paths are stopped.

Examples: >>> tfs_instance = TestFeaturesSim() >>> io_list = [“HW::HwSignalName”, “HW::HwSignalName2”] >>> io_types = [“Voltage”, “Voltage”] >>> tfs_instance.stop_io_channel_observers(io_list, io_types)

get_received_io_channel_values_timeout(signal_path: str, attribute_name: str, timeout: int)

Return a list of received io values of signal_path within the specified timeout.

Note: A listener on the signal_path should be already started.

param signal_path:

HW::SignalHwName

param attribute_name:

HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

param timeout:

Timeout in milliseconds.

returns:

List of (timestamp, iovalue).

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> signal_path = "HW::SignalHwName"  # Replace with the specific signal path
>>> attribute_name = "Voltage"  # Replace with the attribute name
>>> timeout = 1000  # Replace with the desired timeout in milliseconds
>>> received_values = tfs_instance.get_received_io_channel_values_timeout(signal_path, attribute_name, timeout)

# Retrieve a list of received io values of signal_path within the specified timeout.

get_latest_received_io_channel_values(signal_path: str, attribute_name: str, buffer_size: int)

get list of latest buffer_size received io signals.

Note: A listener on the signal_path should be already started.

Parameters:
  • signal_path – HW::SignalHwName

  • attribute_name – HW type: it could be: {Voltage, Current, DutyCycle only relevant for PWM (yaml field –> MeasurementAttribute: PWM), Frequency, Power : relevant only for PS_Channel}

  • buffer_size – buffer size.

Returns:

List of (timestamp, iovalue).

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> signal_path = "HW::SignalHwName"  # Replace with the specific signal path
>>> attribute_name = "Voltage"  # Replace with the attribute name
>>> buffer_size = 10  # Replace with the desired buffer size
>>> latest_values = tfs_instance.get_latest_received_io_channel_values(signal_path, attribute_name, buffer_size)
# Retrieve a list of the latest 'buffer_size' received io signals for signal_path.
configure_io_channel_buffer_max_size(max_buffer_size: int)

Set a maximum size for all io channels buffers equal to max_buffer_size

Parameters:

max_buffer_size – max buffer size to be set.

Returns:

True if the buffer size was correctly configured.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> max_buffer_size = 1000  # Replace with the desired maximum buffer size
>>> result = tfs_instance.configure_io_channel_buffer_max_size(max_buffer_size)
# Set a maximum size for all io channels buffers equal to the specified max_buffer_size.
set_model_element(model_path: str, param: str)

Execute the method of a specific controller given in model_path by reflection using the list of param given in Parameters.

Parameters:
  • model_path – controller name and method name to be invoked separated by :: (e.g. KeyLearningController::PressIdg).

  • param – list of parameters separated by a comma ‘,’ (e.g. tokenId1,Unlock,500).

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> model_path = "KeyLearningController::PressIdg"  # Replace with the specific controller and method name
>>> params = "tokenId1,Unlock,500"  # Replace with the list of parameters separated by commas
>>> result = tfs_instance.set_model_element(model_path, params)
# Execute the method of a specific controller given in model_path by reflection using the provided parameters.
start_stop_cycle_eth_message(service_path: str, ResumeSending: bool)

start stop the sending of a service interface over ethernet.

Parameters:
  • service_path – Path to the Ethernet message for starting or stopping the service.

  • ResumeSending – True to resume sending the service, False to stop it.

Returns:

True if the service is stopped (ResumeSending=False) or started (ResumeSending=True), False otherwise.

Sample values for service_path:

ECU_Name::ServiceId::InstanceId (to stop all Someip events or field notifiers given by serviceId).
ECU_Name::ServiceId::InstanceId::MessageID (to stop a specific Someip event or field notifier).

Note: MessageID is eventID or fieldID. Examples:

>>> import time
>>> tfs_instance = TestFeaturesSim()
>>> tfs_instance.start_stop_cycle_eth_message("ECU_Name::0x123::0x1234::0x001", True)
>>> time.sleep(5)  # Sleep for 5 seconds
>>> tfs_instance.start_stop_cycle_eth_message("ECU_Name::0x123::0x1234::0x001", False)
set_ethernet_network(path: str, param: str)

Set the signal value of a specific member over someip.

Parameters:
  • path

    the path of the member to be set separated by :: ( EcuName::ServiceId::InstanceId::MessageId::MemberName

    e.g. “X_Y_Z::0x0123::0x0001::0x1:MemberName”).

  • param – list of parameters separated by a comma ‘,’ (e.g. 1,true)…

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> path = "X_Y_Z::0x0123::0x0001::0x1::MemberName"  # Replace with the specific member path
>>> params = "1,true"  # Replace with the list of parameters separated by commas
>>> result = tfs_instance.set_ethernet_network(path, params)
# Set the signal value of a specific member over someip using the provided parameters.

Note: If the SomeIP member to be set belongs to a dynamic array, then the size of the dynamic array should be set previously using this keyword: Prepare SomeIp Dynamic Length Array ${EcuName}::${ServiceId}::${InstanceId}::${MessageId}::${array_member}::1 size

set_ethernet_networks(paths: List[str], params: List[str])

Set the signal value of a specific members over someip.

Parameters:
  • path

    list of several paths of the member to be set separated by :: (e.g. “[X_Y_Z::0x0123::0x0001::0x1::MemberName,

    A_B_Z_C::0x0345::0x0002::0x3::MemberName]”).

  • param – list of several parameters separated by a comma ‘,’ (e.g. “[“1,true”,”2,false”]”)…

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> paths = "[X_Y_Z::0x0123::0x0001::0x1::MemberName, A_B_Z_C::0x0345::0x0002::0x3::MemberName]"  # Replace with the specific member paths
>>> params = ["1,true", "2,false"]  # Replace with the list of parameters separated by commas
>>> result = tfs_instance.set_ethernet_networks(paths, params)
# Set the signal value of a specific members over someip using the provided parameters.

Note: If the SomeIP member to be set belongs to a dynamic array, then the size of the dynamic array should be set previously using this keyword: Prepare SomeIp Dynamic Length Array ${EcuName}::${ServiceId}::${InstanceId}::${MessageId}::${array_member}::1 size

check_ethernet_network(path: str, param: str)

Check the signal value of a specific member over someip.

Parameters:
  • path – the path of the member to be checked separated by :: (e.g. “X_Y_Z::0x1013::0x0001::0x1::MemberName”).

  • param – expected value to be checked.

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> path = "X_Y_Z::0x1013::0x0001::0x1::MemberName"  # Replace with the specific member path
>>> expected_value = "true"  # Replace with the expected value to be checked
>>> result = tfs_instance.check_ethernet_network(path, expected_value)
# Check the signal value of a specific member over someip against the expected value.

Note: If the SomeIP member to be checked belongs to a dynamic array, then the size of the dynamic array should be set previously using this keyword: Prepare SomeIp Dynamic Length Array ${EcuName}::${ServiceId}::${InstanceId}::${MessageId}::${array_member}::1 size

check_ethernet_network_with_mask(path: str, param: str, mask: int, comparison: CompareOperation)

Check the signal value of a specific member over someip.

Parameters:
  • path – the path of the member to be checked separated by :: (e.g. “X_Y_Z::0x1013::0x0001::0x1::MemberName”).

  • param – expected value to be checked.

  • mask – the mask

  • comparison – the comparison

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> path = "X_Y_Z::0x1013::0x0001::0x1::MemberName"  # Replace with the specific member path
>>> param = "1"
>>> comparison = CompareOperation.EQ
>>> mask = 0xFFFFFFFFFFFFFFFF
>>> result = tfs_instance.check_ethernet_network_with_mask(path, param, mask, comparison)
# Check the signal value of a specific member over someip against the expected value.

Note: If the SomeIP member to be checked belongs to a dynamic array, then the size of the dynamic array should be set previously using this keyword: Prepare SomeIp Dynamic Length Array ${EcuName}::${ServiceId}::${InstanceId}::${MessageId}::${array_member}::1 size

start_ethernet_observer(path: str)

Start ethernet observer : someip and npdu.

Parameters:

path – the path of the message to be checked separated by :: SOMEIP : ecu_name::service_id::instance_id::message_id NPDU : ecu_name::npdu_id NPDU : ecu_name::npdu_name

Returns:

True if the observer is started. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> path = "ecu_name::service_id::instance_id::message_id"  # Replace with the specific message path
>>> result = tfs_instance.start_ethernet_observer(path)
# Start the ethernet observer for the specified message path.
stop_ethernet_observer(path: str)

Stop ethernet observer : someip and npdu.

Parameters:

path – the path of the message to be checked separated by :: SOMEIP : ecu_name::service_id::instance_id::message_id NPDU : ecu_name::npdu_id NPDU : ecu_name::npdu_name

Returns:

True if the observer is stopped. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> path = "ecu_name::service_id::instance_id::message_id"  # Replace with the specific message path
>>> result = tfs_instance.stop_ethernet_observer(path)
# Stop the ethernet observer for the specified message path.
clear_ethernet_buffer(path: str)

Clear ethernet buffer: someip and npdu.

Parameters:

path – the path of the message to be checked separated by :: SOMEIP : ecu_name::service_id::instance_id::message_id NPDU : ecu_name::npdu_id NPDU : ecu_name::npdu_name

Returns:

True if the buffer is cleared.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> path = "ecu_name::service_id::instance_id::message_id"  # Replace with the specific message path
>>> result = tfs_instance.clear_ethernet_buffer(path)
# Clear the ethernet buffer for the specified message path.
check_ethernet_cyclic_repetition(path: str, cycle: int, cycle_deviation: int, nb_repetition: int, check_latest_values: bool = True)

Check ethernet cyclic repetition : someip and npdu.

Parameters:
  • path – the path of the message to be checked separated by

  • cycle – cycle of msg from NK

  • cycle_deviation – tolerance

  • nb_repetition – number of occurrence

  • check_latest_values – check the last repetition_number times messages or check all the time intervals

Returns:

True if the message respects the required cyclicity. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> path = "ecu_name::service_id::instance_id::message_id"  # Replace with the specific message path
>>> cycle = 100  # Replace with the cycle of the message from NK
>>> cycle_deviation = 5  # Replace with the tolerance
>>> nb_repetition = 3  # Replace with the number of required occurrences
>>> check_latest_values = True  # Set as needed: True to check the last repetition_number times, False for all time intervals
>>> result = tfs_instance.check_ethernet_cyclic_repetition(path, cycle, cycle_deviation, nb_repetition, check_latest_values)
# Check if the message respects the required cyclicity over someip and npdu.
check_ethernet_reception(path: str, is_received: bool)

Check ethernet message reception : someip and npdu.

Parameters:
  • path – the path of the message to be checked separated by :: (e.g. “ecu_name::service_id::instance_id::message_id”).

  • is_received – whether the message is received or not

Returns:

True if message received. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> path = "ecu_name::service_id::instance_id::message_id"  # Replace with the specific message path
>>> is_received = True  # Replace with True if the message is expected to be received, False otherwise
>>> result = tfs_instance.check_ethernet_reception(path, is_received)
# Check if the ethernet message at the specified path is received based on the provided expectation.
ethernet_apply_faulty_corrupt_crc(path: str, on_off: bool)

Apply CRC corruption to an Ethernet message. this function aims to disable/enable the CRC calculation, so the user can test sending ethernet payload (SOME/IP or NPDU) with corrupted CRC.

Parameters:
  • path – Path to the Ethernet message for which CRC corruption is applied.

  • on_off – Determines whether to enable (True) or disable (False) CRC corruption.

Returns:

True if data corrupted when on_off is True and corruption is disabled where on_off is False, else False.

Sample format for path:

ECU_Name::ServiceId::InstanceId::MethodID (for Someip). ECU_Name::Npdu_Id::Signal_Name (for Npdu). ECU_Name::Npdu_NAme::Signal_Name (for Npdu).

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> ethernet_path = "ECU_Name::ServiceId::InstanceId::MethodID"
>>> enable = True
>>> tfs_instance.ethernet_apply_faulty_corrupt_crc(ethernet_path, enable)
ethernet_apply_faulty_corrupt_alive_counter(path: str, on_off: bool)

Apply corruption to the alive counter of an Ethernet message. This function aims to disable/enable the alive counter incrementation, so the user can test sending ethernet payload (SOME/IP or NPDU) with a corrupted Alive counter.

Parameters:
  • path – Path to the Ethernet message for which alive counter corruption is applied.

  • on_off – Determines whether to enable (True) or disable (False) alive counter corruption.

Returns:

True if faulty applied else False.

Sample format for path:

ECU_Name::ServiceId::InstanceId::MethodID (for Someip). ECU_Name::Npdu_Id::Signal_Name (for Npdu). ECU_Name::Npdu_Name::Signal_Name (for Npdu).

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> ethernet_path = "ECU_Name::ServiceId::InstanceId::MethodID"
>>> enable = True
>>> tfs_instance.ethernet_apply_faulty_corrupt_alive_counter(ethernet_path, enable)
check_list_eth_network(paths: list[str], params: list[str], operation: int)

Check a list of Ethernet network paths.

Parameters:
  • paths – list of paths to be checked.

  • params – list of expected values.

  • operation – Specifies the operation type.

Returns:

True if check succeed, false otherwise.

Note

The number of paths to be checked should be equal to the number of params. Sample values for operation:

  • 0 (All): All the given paths should succeed in order to return true.

  • 1 (ONLY ONE): At least one path should succeed to return true.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> paths = ["ECU::0x123::0x1234::0x1::val1::val2", "ECU::0x100::0x1000::0x1::val1::val3"]
>>> params = [1, 0]
>>> result = tfs_instance.check_list_eth_network(paths, params, 1)

Note: If the SomeIP member to be checked belongs to a dynamic array, then the size of the dynamic array should be set previously using this keyword: Prepare SomeIp Dynamic Length Array ${EcuName}::${ServiceId}::${InstanceId}::${MessageId}::${array_member}::1 size

send_someip_raw_payload(path: str, payload: List)

send a raw payload over someip.

Parameters:
  • path – the path of the message id(method id/ event id/ notifier id ..) to be set separated by :: (e.g. “ecu_name::service_id::instance_id::message_id”).

  • payload – payload to be sent.

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> path = "ecu_name::service_id::instance_id::message_id"  # Replace with the specific message id path
>>> payload = [0x01, 0x02, 0x03, 0x04]  # Replace with the payload to be sent
>>> result = tfs_instance.send_someip_raw_payload(path, payload)
# Send a raw payload over someip using the provided path and payload.
start_simulations(ecu_list: List[str])

Start simulations

Parameters:

ecu_list – list of ecus for simulations to be started.

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> ecu_list = ["ECU1", "ECU2", "ECU3"]  # Replace with the list of ECUs for simulation
>>> result = tfs_instance.start_simulations(ecu_list)
# Start simulations for the specified list of ECUs.
stop_simulations(ecu_list: List[str])

Stop simulations.

Parameters:

ecu_list – list of ecus for simulations to be stopped.

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> ecu_list = ["ECU1", "ECU2", "ECU3"]  # Replace with the list of ECUs for simulation
>>> result = tfs_instance.stop_simulations(ecu_list)
# stop simulations for the specified list of ECUs.
resume_simulations(ecu_list: List[str])

Resume simulations.

Parameters:

ecu_list – list of ecus for simulations to be resumed

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> ecu_list = ["ECU1", "ECU2", "ECU3"]  # Replace with the list of ECUs for simulation
>>> result = tfs_instance.resume_simulations(ecu_list)
# resume simulations for the specified list of ECUs.
suspend_simulations(ecu_list: List[str])

Suspend simulations.

Parameters:

ecu_list – list of ecus for simulations to be suspended

Returns:

True if there is no problem appearing while executing the function. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> ecu_list = ["ECU1", "ECU2", "ECU3"]  # Replace with the list of ECUs for simulation
>>> result = tfs_instance.suspend_simulations(ecu_list)
# suspend simulations for the specified list of ECUs.
bus_observer_check_traffic(bus_name: str, timeout: int, event_type: int = 0)
Check traffic over bus during [ current_time - timeout, current_time]

A listener should be started before over this specific bus and specific event

Parameters:
  • bus_name – BusName

  • timeout – timeout in ms

  • event_type – enum { 0 : Frame event , 1 : Lin wake up event , 2 : Lin sleep event}

Returns:

True if there is the event is received during the [ current_time - timeout, current_time], otherwise false.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> bus_name = "bus_name"
>>> tfs_instance.bus_observer_check_traffic(bus_name, 1000, 0)
bus_observer_monitor_traffic(bus_name: str, timeout: int, event_type: int = 0)

Monitor traffic over bus during [ current_time - timeout, current_time]

Note: A listener should be already started over this specific bus.

Parameters:
  • bus_name – BusName

  • timeout – timeout in ms

  • event_type – enum { 0 : Frame event , 1 : Lin wake up event , 2 : Lin sleep event}

Returns:

True if there is the event is received during [ current_time - timeout, current_time], otherwise false.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> bus_name = "bus_name"
>>> tfs_instance.bus_observer_monitor_traffic(bus_name, 1000,0)
bus_observer_stop_observer(bus_name: str, event_type: int = 0)

Stop the started observer over specific Bus.

Parameters:
  • bus_name – BusName

  • event_type – enum { 0 : Frame event , 1 : Lin wake up event , 2 : Lin sleep event}

Returns:

True if the observer is stopped.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> bus_name = "bus_name"
>>> tfs_instance.bus_observer_stop_observer(bus_name,0)
bus_observer_start_observer(bus_name: str, event_type: int = 0)

Start an observer over specific Bus.

Parameters:
  • bus_name – BusName

  • event_type – enum { 0 : Frame event , 1 : Lin wake up event , 2 : Lin sleep event}

Returns:

True if the observer is started.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> bus_name = "bus_name"
>>> tfs_instance.bus_observer_start_observer(bus_name,0)
subscribe_to_nm_manager(bus_name: str, msg_id: int)

Add message to nm manager of test feature simulation to take into consideration nm state when starting msg.

Parameters:
  • bus_name – BusName

  • msg_id – message id.

Returns:

True if the message is added.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> bus_name = "BusName"  # Replace with the actual bus name
>>> msg_id = 123  # Replace with the specific message ID
>>> result = tfs_instance.subscribe_to_nm_manager(bus_name, msg_id)
# Subscribe a message to nm manager in the test feature simulation.
start_transmission_frame_sim(bus_name: str, frame_id: int) bool

Start sending a frame using the corresponding simulation.

Parameters:
  • bus_name – The bus name over which the sending will be started.

  • frame_id – The frame identifier.

Returns:

True if the start was performed.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> bus_name = "BusName"  # Replace with the specific bus name
>>> frame_id = 123  # Replace with the frame ID to be sent
>>> result = tfs_instance.start_transmission_frame_sim(bus_name, frame_id)
# Start sending a frame using the corresponding simulation.
stop_transmission_frame_sim(bus_name: str, frame_id: int) bool

Stop sending a frame using the corresponding simulation.

Parameters:
  • bus_name – The bus name over which the sending will be stopped.

  • frame_id – The frame identifier.

Returns:

True if the stop was performed.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> bus_name = "BusName"  # Replace with the specific bus name
>>> frame_id = 123  # Replace with the frame ID to be sent
>>> result = tfs_instance.stop_transmission_frame_sim(bus_name, frame_id)
# stop sending a frame using the corresponding simulation.
reset_frame_sim(bus_name: str, frame_id: int) bool

Reset frame using the corresponding simulation.

Parameters:
  • bus_name – The bus name over which the sending will be reset.

  • frame_id – The frame identifier.

Returns:

True if the reset was performed.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> bus_name = "BusName"  # Replace with the specific bus name
>>> frame_id = 123  # Replace with the frame ID to be sent
>>> result = tfs_instance.reset_frame_sim(bus_name, frame_id)
# Reset frame using the corresponding simulation.
check_monitor_happened_message(message_path, byte_indexes, byte_values, byte_masks, timeout)
Check monitor happened for message.

This function simply checks if the message with message_path at byte_index got the byte_value at least once during the timeout

Parameters:
  • message_path – message path to be listening on.

  • byte_indexes – list of byte’s indexes to check on.

  • byte_values – list of byte’s values to be checked.

  • byte_masks – list of masks.

  • timeout – timeout

Returns:

True if the byte_values had been found during timeout.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> message_path = "bus_name::0x123::signal_name"
>>> byte_indexs = [0, 1, 2]
>>> byte_values = [0x11, 0x22, 0x33]
>>> byte_masks = [0xFF, 0xFF, 0xFF]
>>> timeout = 1000
>>> result = tfs_instance.check_monitor_happened_message(message_path, byte_indexs, byte_values, byte_masks, timeout)
monitor_message_is_received(message_path: str, timeout)

Check monitor for received message This function simply checks if the message with message_path was received at least once during the timeout :param message_path: message path to be listening on. :param timeout: timeout :returns: True if the message been received during timeout. Examples: >>> tfs_instance = TestFeaturesSim() >>> message_path = “bus_name::0x123” >>> timeout = 1000 >>> result = tfs_instance.monitor_message_is_received(“bus_name::0x123”, timeout)

monitor_multi_messages_are_received(message_path: list[str], timeout)

Check monitor for received message This function simply checks if the message with message_path was received at least once during the timeout :param message_path: message path to be listening on. :param timeout: timeout :returns: True if the message been received during timeout. Examples: >>> tfs_instance = TestFeaturesSim() >>> message_path = list of string containing:”bus_name::0x123” >>> timeout = 1000 >>> result = tfs_instance.monitor_message_is_received([“bus_name::0x123”], timeout)

parse_key_pack_path(key_pack_path: str)
start_secoc(key_pack_path: str, bit_counting: ~mtf.libs.mtf_pybinder.BitCountingPolicy = <BitCountingPolicy.Sawtooth: 1>)

Start SecOC.

Parameters:
  • key_pack_path – key pack path.

  • bit_counting – bit counting policy (Monotone or Sawtooth)

Returns:

True if the starting of SecOC has succeeded.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> key_pack_path = "/path/to/key_pack"  # Replace with the actual key pack path
>>> result = tfs_instance.start_secoc(key_pack_path, BitCounting.Monotone)
# Start SecOC using the specified key pack path.
start_secoc_api(key_pack_path: str, bit_counting: ~mtf.libs.mtf_pybinder.BitCountingPolicy = <BitCountingPolicy.Sawtooth: 1>)

Start SecOC.

Returns:

True if the starting of SecOC has succeeded.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> result = tfs_instance.start_secoc()
# Start SecOC using the specified key pack path.
set_secoc_bit_counting_policy(bit_counting: ~mtf.libs.mtf_pybinder.BitCountingPolicy = <BitCountingPolicy.Sawtooth: 1>)

set bit counting policy

Parameters:

bit_counting – bit counting policy (Monotone or Sawtooth)

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> result = tfs_instance.tfs_set_bit_counting_policy(bit_counting)
enable_secoc()

Enable SecOC

Returns:

True if secoc is enabled.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> result = tfs_instance.enable_secoc()
disable_secoc()

Disable SecOC

Returns:

True if secoc is disabled.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> result = tfs_instance.disable_secoc()
# Disable SecOC.
update_key_pack_secoc(key_pack_path: str)

Update key pack for SecOC

Parameters:

key_pack_path – key pack path.

Returns:

True if the key pack is updated.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> key_pack_path = "/path/to/key_pack"  # Replace with the actual key pack path
>>> result = tfs_instance.start_secoc(key_pack_path)
# Start SecOC using the specified key pack path.
enable_faulty_freshness_secoc_all()

Enable faulty freshness for SecOC (for all pdus) Examples:

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> tfs_instance.enable_faulty_freshness_secoc_all()
disable_faulty_freshness_secoc_all()

Disable faulty freshness for SecOC (for all pdus) Examples:

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> tfs_instance.disable_faulty_freshness_secoc_all()
enable_faulty_freshness_secoc(pdu_path: str)

Enable faulty freshness for SecOC (for a specific pdu).

Parameters:

pdu_path – channel_name::frame_id::pdu_name

Returns:

True if the faulty freshness is enabled.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> pdu_path = "ChannelName::FrameID::PDUName"  # Replace with the specific PDU path
>>> result = tfs_instance.enable_faulty_freshness_secoc(pdu_path)
# Enable faulty freshness for SecOC for a specific PDU specified by pdu_path.
disable_faulty_freshness_secoc(pdu_path: str)

Disable faulty freshness for SecOC (for a specific pdu).

Parameters:

pdu_path – channel_name::frame_id::pdu_name

Returns:

True if the faulty freshness is disabled.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.disable_faulty_freshness_secoc("channel_name::frame_id::pdu_name")
overwrite_freshness_counter(value: int)

Overwrite the freshness counter. :param value: the value of the new freshness counter :returns (bool): True if the new value of freshness counter has been set successfully. Examples:

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> freshness_value = 100  # Replace with the desired freshness counter value
>>> result = tfs_instance.overwrite_freshness_counter(freshness_value)
# Overwrite the freshness counter with the specified value.
start_secoc_by_keys(keys_map: dict[int, list[int]]) bool

add keys and start secoc module Examples :

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> keys_map_sec = {1: [1,2,3,4,5,6,7,8]}
>>> result = tfs_instance.start_secoc_by_keys(keys_map_sec)
add_secoc_keys(keys_map: dict[int, list[int]]) bool

add keys for secoc module. Module should be started before calling this api using start_secoc Or start_secoc_by_keys Examples :

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> keys_map_sec = {1: [1,2,3,4,5,6,7,8]}
>>> keys_map_sec_1 = {2: [21,22,23,24,25,26,27,28]}
>>> tfs_instance.start_secoc_by_keys(keys_map_sec)
>>> result = tfs_instance.add_secoc_keys(keys_map_sec_1)
add_secoc_freshness_master_config(freshness_pdu_path: str, step: int) bool

Configure freshness master pdu path and its step.Pdu cyclicity should be defined in Databex or overrided using other Apis :param freshness_pdu_path: path of the freshness pdu to simulate :param step: step to increment the pdu passed in the first paramter for each cyclic send Examples :

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> pdu_abs_path_channel = "ChannelName::0x111::PduName"
>>> pdu_abs_path_ecuname = "EcuSenderName::0x222"
>>> step = 10
>>> result = tfs_instance.add_secoc_freshness_master_config(pdu_abs_path_channel,step)
>>> result &= tfs_instance.add_secoc_freshness_master_config(pdu_abs_path_ecuname,step)
start_stop_secoc_freshness_master(freshness_pdu_path: str, state: bool) bool

Start or Stop Sec_oc Freshness Master by specifying the pdu path. :param freshness_pdu_path: path of the freshness pdu to simulate :param state: True will start freshness incrementation and repsond to any freshness challenge :return True if the pdu was found and there is challenge/Response pdus defined in db used to state the application Examples :

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> pdu_abs_path_channel = "ChannelName::0x111::PduName"
>>> pdu_abs_path_ecuname = "EcuSenderName::0x222"
>>> state_ = True
>>> result = tfs_instance.add_secoc_freshness_master_config(pdu_abs_path_channel,state_)
>>> result &= tfs_instance.add_secoc_freshness_master_config(pdu_abs_path_ecuname,state_)
monitor_correctness_secoc(channel_name: str, pdu_name: str, frame_id: int, timeout: int)

Monitor correctness of a specific secured pdu.

Parameters:
  • channel_name – the name of the channel

  • pdu_name – the name of the pdu

  • frame_id – the id of the frame timeout (int) : a period of time defined in ms

Returns:

True if all secured data received during this timeout for this specific pdu are correct. Otherwise, False.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.monitor_correctness_secoc("channel_name","pdu_name",0x123,1000)
check_monitor_always_message(message_path, byte_indexes, byte_values, byte_masks, timeout, operation=LogicalOperation.ALL)

Check monitor Always for message.

This function simply checks if the message with message_path at byte_index got the byte_value during the timeout.

Parameters:
  • message_path – message path to be listening on.

  • byte_indexes – list of byte’s indexes to check on.

  • byte_values – list of byte’s values to be checked.

  • byte_masks – list of masks.

  • timeout – timeout in ms.

  • operation – Defaults to LogicalOperation.All.

Returns:

True if the byte_values had been found during timeout.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> message_path = "bus_name::0x123::signal_name"
>>> byte_indexs = [0, 1, 2]
>>> byte_values = [0x11, 0x22, 0x33]
>>> byte_masks = [0xFF, 0xFF, 0xFF]
>>> timeout = 1000
>>> result = tfs_instance.check_monitor_always_message(message_path, byte_indexs, byte_values, byte_masks, timeout)
get_all_protected_frame_by_secoc(channel_name: str) dict[str, set[str]]

Get all secoc frames and correspandiing sender name by bus

Parameters:

channel_name – the name of the channel

Returns:

a dict as key the frame_name and as value a set of senders names

get_someip_last_value(path: str)

Return the last received value for someip.

Parameters:

path – e.g. Ecu_Name::Service_Id::Instance_Id:Message_Id::Member_Name

Returns:

the last received value

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> signal_path = "Ecu_Name::Service_Id::Instance_Id:Message_Id::Member_Name"
>>> value = tfs_instance.get_someip_last_value(signal_path)

Note: If the SomeIP member to be gotten belongs to a dynamic array, then the size of the dynamic array should be set previously using this keyword: Prepare SomeIp Dynamic Length Array ${EcuName}::${ServiceId}::${InstanceId}::${MessageId}::${array_member}::1 size

get_npdu_last_value(path: str)

Return the last read npdu value.

Parameters:

path – path to the desired value, following one of these two formats: 1. EcuName::PduID 2. EcuName::PduID::SignalName 2. EcuName::PduName::SignalName

Returns:

the last received value

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.get_npdu_last_value("EcuName::PduID")
monitor_always_eth_network(path: str, expectedValue: str, timeout: int, comparison: int, mask: int)

Monitor Always over ethernet network.

Parameters:
  • path – path (e.g. IPF_FAR::0x9004::0x0001::0x8001::statusDrivingReadiness)

  • expectedValue – expected value.

  • timeout – timeout in ms.

  • comparison – EQUAL : 0, >= : 1 , > : 2, <= : 3 , < : 4 , Not EQUAL : 5

  • mask – mask to be applied to signal.

Returns:

True if the value from ethernet network is the same as expected. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> network_path = "ECU_Name::Service_Id::Instance_Id::Method_Id"
>>> expected_value = "1"
>>> timeout = 1000
>>> comparison = 0
>>> mask = 0xFFFFFFFF
>>> result = tfs_instance.monitor_always_eth_network(network_path, expected_value, timeout, comparison, mask)

Note: If the SomeIP member to be monitored belongs to a dynamic array, then the size of the dynamic array should be set previously using this keyword: Prepare SomeIp Dynamic Length Array ${EcuName}::${ServiceId}::${InstanceId}::${MessageId}::${array_member}::1 size

monitor_happened_eth_network(path: str, expectedValue: str, timeout: int, comparison: int, mask: int)

Monitor Happened over ethernet network.

Parameters:
  • path – path (e.g. ECU_Name::Service_Id::Instance_Id::Method_Id)

  • expectedValue – expected value.

  • timeout – timeout in ms.

  • comparison – EQUAL : 0, >= : 1 , > : 2, <= : 3 , < : 4 , Not EQUAL : 5

  • mask – mask to be applied to signal.

Returns:

True if the value from ethernet network is the same as expected. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> network_path = "ECU_Name::Service_Id::Instance_Id::Method_Id"
>>> expected_value = "1"
>>> timeout = 1000
>>> comparison = 0
>>> mask = 0xFFFFFFFF
>>> result = tfs_instance.monitor_happened_eth_network(network_path, expected_value, timeout, comparison, mask)

Note: If the SomeIP member to be monitored belongs to a dynamic array, then the size of the dynamic array should be set previously using this keyword: Prepare SomeIp Dynamic Length Array ${EcuName}::${ServiceId}::${InstanceId}::${MessageId}::${array_member}::1 size

monitor_someip_message_reception(path: str, is_recveied: bool, timeout: int)

Monitor someip message reception.

Parameters:

path – path (e.g. ECU_Name::Service_Id::Instance_Id::Method_Id)

:param is_recveied :param timeout: timeout in ms.

Returns:

True if a specific someip message was received or not during the timeout passed into paramter. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> network_path = "ECU_Name::Service_Id::Instance_Id::Method_Id"
>>> is_recveied = True
>>> timeout = 1000
>>> result = tfs_instance.monitor_someip_message_reception(network_path, is_recveied, timeout)
monitor_ethernet_message_reception(path: str, is_recveied: bool, timeout: int)

Monitor ethernet (Someip and Npdu) message reception.

Parameters:

path – path (e.g. ECU_Name::Service_Id::Instance_Id::Method_Id)

:param is_recveied :param timeout: timeout in ms.

Returns:

True if a specific ethernet message was received or not during the timeout passed into paramter. False otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> network_path = "ECU_Name::Service_Id::Instance_Id::Method_Id"
>>> is_recveied = True
>>> timeout = 1000
>>> result = tfs_instance.monitor_ethernet_message_reception(network_path, is_recveied, timeout)
get_model_element(model_path: str, param: str)

Execute the method of a specific controller given in model_path by reflection using the list of param given in Parameters.

Parameters:
  • model_path – controller name and method name to be invoked separated by :: (e.g. KeyLearningController::SDSKeyDataValidateOrderByBlock). Found in ControllerConfig.json file.

  • param – list of parameters separated by a comma ‘,’

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> model_path = "KeyLearningController::SDSKeyDataValidateOrderByBlock"  # Replace with the desired controller and method name
>>> parameters = "param1,param2,param3"  # Replace with actual parameters separated by commas found in model file
>>> result = tfs_instance.get_model_element(model_path, parameters)
# Execute the method specified by model_path with the provided parameters.
start_ethernet_simulation(ecu_name: str) bool

Start an ethernet simulation by ecu name.

Parameters:

ecu_name – ecu name.

Returns:

True if the simulation is started.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.start_ethernet_simulation("ECU")
stop_ethernet_simulation(ecu_name: str) bool

Stop an ethernet simulation by ecu name.

Parameters:

ecu_name – ecu name.

Returns:

True if the simulation is stopped.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.stop_ethernet_simulation("ECU")
send_npdu_message(ecu_name: str, pdu_id: int, payload: List, force_send: bool = False) bool

Send npdu raw payload message.

Parameters:
  • ecu_name – simulation name with respect to loaded database

  • pdu_id – nPDU id

  • payload – PDU payload

  • force_send – if true will send the payload even if no change from last payload, defaults to False

Returns:

True if payload sent correctly, false otherwise.

Examples:
>>> tfs_instance = TestFeaturesSim()
>>> result = tfs_instance.send_npdu_message("ECU", 0xABC, [0xff,0xff,0xff,0xff,0xff,0xff,0xff,0xff])
testfeature_cleanuptestcase()

Stop all started observers and listeners. Examples:

>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> tfs_instance.testfeature_cleanuptestcase()
# Stop all started observers and listeners.
set_reponse_method_timeout(method_path: str, timeout: int)

Sets a response method timeout.

Args:

method_path(str): methodPath should be: Provider_ECU_Name::Service_ID::Instance_Id::Method_ID timeout (int): timeout in ms

Returns :

bool: True if the timeout is set; otherwise, False.

Examples:
>>> tfs_instance = TestFeaturesSim()  # Create an instance of TestFeaturesSim
>>> tfs_instance.tfs_set_response_timeout(method_path,timeout)
control_someip_method_response(method_path: str, method_input_member: str, method_return_member: str, on_off: bool)

Control someip method response . Use this function to control method responses : To enable/disable setting the method response by the value of the method request, provide the ‘method_path’,

‘method_input_member’, and ‘method_return_member’,’on_off’ parameters.

Args:
method_path (str): The identifier of the method in the formats :

Provider_ECU_Name::Service_Id::Instance_Id::Method_Id.

method_input_member (str): The input parameter for the method response in the format ‘INPUT::method_input_member’. method_return_member (str): The return parameter for the method response in the format ‘OUTPUT::method_return_member’

on_off (bool): A boolean value indicating whether to stop (False) or resume (True) sending the response based on input and return values.

Examples:

  • To enable method response control based on input and return values: control_method_response(“Provider_ECU_Name::Service_Id::Instance_Id::Method_Id,INPUT::method_input_member,OUTPUT::method_return_member”, True) control_method_response(“X_Y_Z::0x1013::0x0001::0x1”, “INPUT::ping”, “OUTPUT::pong”, True)

  • To disable method response control: control_method_response(“Provider_ECU_Name::Service_Id::Instance_Id::Method_Id”, “INPUT::method_input_member”, “OUTPUT::method_return_member” , False) control_method_response(“X_Y_Z::0x1013::0x0001::0x1”, “INPUT::ping”, “OUTPUT::pong”, False)

start_stop_methode_response(method_path: str, on_off: bool)

stop or resume sending the response automatically for a method, provide the ‘method_path’ and ‘on_off’ parameters. Args:

method_path (str): The identifier of the method in the formats :

Provider_ECU_Name::Service_Id::Instance_Id::Method_Id.

on_off (bool): A boolean value indicating whether to stop (False) or resume (True) sending the response .

Examples: - To stop sending responses:

control_method_response(“Provider_ECU_Name::Service_Id::Instance_Id::Method_Id”, False) control_method_response(“X_Y_Z::0x1013::0x0001::0x1”, False)

  • To resume sending responses: control_method_response(“Provider_ECU_Name::Service_Id::Instance_Id::Method_Id”, True) control_method_response(“X_Y_Z::0x1013::0x0001::0x1”, True)