ANDi frequent use cases

Creating and sending SOME/IP-SD messages

To create a SOME/IP-SD message you have to follow this approach:

1. Perform mapping of channels to adapters

  1. Click on adapters in the Scripting toolbar

  2. The configuration dialog appears

  3. Map the channels to the adapters

../_images/adapters.png

2. Create message object

  1. Right click on Messages

  2. Select message to be added

../_images/add_message.png

3. Create Script

  1. Right click on Scripts

  2. Select Add Script

  3. Open the Script in the editor and configure the message

  4. Send message object

from globals import *

# set ethernet and ip header
someip_msg_1.ethernet_header.mac_address_source = "11:22:33:44:55:66"
someip_msg_1.ip_header.ip_address_source = "160.48.199.55"
someip_msg_1.ethernet_header.mac_address_destination = "99:88:77:66:55:44"
someip_msg_1.ip_header.ip_address_destination = "160.48.199.66"

# set SOME/IP specific fields
someip_msg_1.someip_header.service_identifier = 0x1111
someip_msg_1.someip_header.method_identifier = 0x2222
someip_msg_1.someip_header.interface_version = 0x02
someip_msg_1.someip_header.protocol_version = 0x03
someip_msg_1.someip_header.request_id = 0x4444

# set payload of SOME/IP message
someip_msg_1.payload = System.Array[Byte]([0x11,0x22,0x33])

# send the message
someip_msg_1.send()

Creating and sending messages via Drag and Drop

1. Load a Fibex database file

2. In the Messages view select an event

  1. Move the mouse with the still pressed button to Messages of the Scripting view (global or local)

  2. Now release the mouse button

  3. A new message object is then created

3. The message object can be used in the scripts

../_images/event_via_drag_and_drop.gif

Receiving packets

import time

# dynamically create SOME/IP-SD packet
msg_someip_sd = message_builder.create_someip_sd_message("channel_01", "channel_01")

# callback function invoked on each incoming SOME/IP-SD packet
def on_msg_received(msg):
        # do logic here
        print(msg)
        pass

# register callback function
msg_someip_sd.on_message_received += on_msg_received

# start live capturing for SOME/IP-SD packets
msg_someip_sd.start_capture()

# sleep for 5 seconds
time.sleep(5)

# de-register callback function
msg_someip_sd.on_message_received -= on_msg_received

# stop capturing
msg_someip_sd.stop_capture()

Executing and stopping scripts

There are several ways on how to execute and stop scripts

1. Start/Stop button of the main menu

  1. Select scripts which should be executed

  2. Press on the Start/Stop button

../_images/script1.png

2. Right click on the script name

../_images/scriptmethod2.png

3. Start/Stop button of the editor

../_images/scriptmethod3.png

Cyclic procedures using timers

  • Timers can be used in order to implement cyclic procedures

  • Timer events:

  1. on_time_elapsed(source, current_date)

  • The interval attribute of the timer specifies the cycle time of the timer

  • After expiration of the interval the on_time_elapsed() event is invoked and the registered callback functions are executed

  1. on_time_out(source, current_date)

  • When starting the timer the running time of the timer can be optionally specified

  • After expiration of the given running time the on_time_out event is triggered and the registered callback functions are executed

  • Timer methods:

  1. Start()

  2. Start(timeout_ms)

  3. Stop()

  4. Reset ()

Note

registered callback functions are executed in separated threads.

In fact, one should make sure that the callback function does not require more execution time than the specified interval time. Otherwise, they can be overlapped!

Cyclic sending of a SOME/IP Event

from globals import *
import time

msg_someip.ethernet_header.mac_address_destination = "d8:18:2b:80:19:be"
msg_someip.ethernet_header.mac_address_source = "38:06:b4:80:03:52"
msg_someip.ip_header.ip_address_destination ="160.48.199.16"
msg_someip.ip_header.ip_address_source = "160.48.199.93"
msg_someip.transport_header.port_destination = 30501
msg_someip.transport_header.port_source = 30501
msg_someip.someip_header.service_identifier = 0xb512
msg_someip.someip_header.method_identifier = 0x01
msg_someip.someip_header.protocol_version = 0x01
msg_someip.someip_header.interface_version = 0x01
msg_someip.someip_header.message_type = MessageType.REQUEST
msg_someip.someip_header.return_code = ReturnCode.E_OK
msg_someip.payload = System.Array[Byte]([0x0c, 0x04, 0x5d])

def on_time_elapsed(a,b):
        msg_someip.send()

g_timer_2.on_time_elapsed += on_time_elapsed
g_timer_2.interval = 1000
g_timer_2.start()

time.sleep(5)

g_timer_2.stop()
g_timer_2.on_time_elapsed -= on_time_elapsed

Packetized SOME/IP packets

  • SOME/IP message objects support the creation of messages containing several SOME/IP frames (=packetized)

msg_someip_1.append_message(msg_someip_2) adds msg_someip_2 to msg_someip_1

  • Packetized SOME/IP packets can be accessed and analyzed separately:

msg_someip.messages() returns a list of the SOME/IP frames which are contained in the msg_someip message

Sending of Packetized SOME/IP Frames

from globals import *

# set ethernet and ip header
someip_msg_1.ethernet_header.mac_address_source = "11:22:33:44:55:66"
someip_msg_1.ip_header.ip_address_source = "160.48.199.55"
someip_msg_1.ethernet_header.mac_address_destination = "99:88:77:66:55:44"
someip_msg_1.ip_header.ip_address_destination = "160.48.199.66"

# set SOME/IP specific fields
someip_msg_1.someip_header.service_identifier = 0x1111
someip_msg_1.someip_header.method_identifier = 0x2222
someip_msg_1.someip_header.interface_version = 0x02
someip_msg_1.someip_header.protocol_version = 0x03
someip_msg_1.someip_header.request_id = 0x4444

# set payload of SOME/IP message 1
someip_msg_1.payload = System.Array[Byte]([0x11,0x22,0x33])

# set SOME/IP specific fields of message 2
msg_someip_1.someip_header.service_identifier = 0x3333
msg_someip_1.someip_header.method_identifier = 0x4444
msg_someip_1.someip_header.interface_version = 0x02
msg_someip_1.someip_header.protocol_version = 0x03
msg_someip_1.someip_header.request_id = 0x5555

# set payload of SOME/IP message 2
msg_someip_1.payload = System.Array[Byte]([0x44,0x55,0x66])

# append SOME/IP message 2 to SOME/IP message 1
someip_msg_1.append_message(msg_someip_1)

# send the message 1
someip_msg_1.send()

Receiving Packetized SOME/IP Frames

from globals import *

# returns the payload as a list
def get_int_list(system_bytes):
        result = []
        for s in system_bytes:
                result.append(hex(int(s)))
        return result

# stores the current SOME/IP frame
someip_frame = 0
def on_msg_received(msg):
        global someip_frame
        # loop over contained SOME/IP frames contained in received SOME/IP message
        for m in msg.messages:
                someip_frame += 1
                print("[{0}] 0x{1:x}".format(someip_frame, m.someip_header.message_id))
                print("[{0}] {1}".format(someip_frame, get_int_list(m.payload)))

# add on_msg_received method as listener
msg_someip.on_message_received += on_msg_received

# Capturing from a pcap file, second parameter means synchronous
msg_someip.start_capture(someip_empfangen_paketized.TestProject.DirectoryName + "\\pcap\\someip_paketized_one_packet.pcap",True)

# clean up
msg_someip.on_message_received -= on_msg_received
msg_someip.stop_capture()

Adding SOME/IP-SD Entries

../_images/addingentries.png

SOME/IP-SD messages provide several methods for adding entries to the message

  1. Two categories

  1. Using another entry

  2. Specifying each field of the entry

SOME/IP-SD with several Entries

msg_someip_sd = message_builder.create_someip_sd_message("channel_01", "channel_01")
msg_someip_sd.add_find_service_entry(0x1111,0x01,0x01,0x01,0x01)
msg_someip_sd.add_offer_service_entry(0x2222,0x01,0x01,0x01,0x01)
msg_someip_sd.add_subscribe_event_group_entry(0x3333,0x01,0x01,0x01,0x01)
msg_someip_sd.add_subscribe_event_group_ack_entry(0x4444,0x01,0x01,0x01,0x01)
msg_someip_sd.add_stop_offer_service_entry(0x5555,0x01,0x01,0x01,0x01)
msg_someip_sd.add_subscribe_event_group_ack_entry(0x6666,0x01,0x01,0x01,0x01)

msg_someip_sd.send()

(this packet does not respect the SOME/IP-SD requirements, since there are incompatible entries in the same packet but ANDi allows it e.g. to perform negative tests.)

Extracting SOME/IP-SD Entries

  • SOME/IP-SD messages provide methods for extracting the entries out of the message

  • Those methods return a list of the respective entry type

msg_someip_sd.get_find_service_entries()
msg_someip_sd.get_offer_service_entries()
msg_someip_sd.get_stop_offer_service_entries()
msg_someip_sd.get_subscribe_event_group_entries()
msg_someip_sd.get_subscribe_event_group_ack_entries()
msg_someip_sd.get_subscribe_event_group_nack_entries()
msg_someip_sd.get_stop_subscribe_event_group_entries()


from globals import *

# callback function, invoked on each incoming SOME/IP-SD packet
def on_msg_received(msg):
        for f in msg.get_find_service_entries():
                print("find service-id: {0}".format(f.service_id))
        for o in msg.get_offer_service_entries():
                print("offer service-id: {0}".format(o.service_id))
        for so in msg.get_stop_offer_service_entries():
                print("stop offer service-id: {0}".format(so.service_id))
        for s in msg.get_subscribe_event_group_entries():
                print("subscribe service-id: {0}".format(s.service_id))
        for sa in msg.get_subscribe_event_group_ack_entries():
                print("subscribe ack service-id: {0}".format(sa.service_id))
        for san in msg.get_subscribe_event_group_nack_entries():
                print("subscribe nack service-id: {0}".format(san.service_id))
        for sse in msg.get_stop_subscribe_event_group_entries():
                print("stop subscribe service-id: {0}".format(san.service_id))

# register callback function
msg_someip_sd.on_message_received += on_msg_received

# start live capturing for SOME/IP-SD packets
msg_someip_sd.start_capture(someip_sd_entries_extrahieren.TestProject.DirectoryName + "\\pcap\\someip_sd_multiple_entries.pcap", True)

# de-register callback function
msg_someip_sd.on_message_received -= on_msg_received

# stop capturing
msg_someip_sd.stop_capture()

Adding SOME/IP-SD Option

msg_someip_sd.add_ipv4_option("address",port,isUDP,isMulticast)
msg_someip_sd.add_ipv4_option(e,port,"address",isUDP,isMulticast)
msg_someip_sd.add_ipv4_option(e,port,"address",isUDP,isMulticast,index)
  • Currently IPv4 and IPv6 options are supported

  • If an entry is passed, then the options number of the entry is automatically incremented

  • If the options number is set wrong because of that, then this value can be set fixed in the entry

entry.flag_op_1

entry.flag_op_2

msg_someip_sd = message_builder.create_someip_sd_message("channel_01", "channel_01")

# create offer entries
o_entry_1 = msg_someip_sd.add_offer_service_entry(0x1111,0x01,0x01,0x01,0x01)
o_entry_2 = msg_someip_sd.add_offer_service_entry(0x2222,0x01,0x01,0x01,0x01)

# set number of options for second offer entry
o_entry_2.flag_op_1 = 1

# add UDP IPv4 options index and assign first and assign first offer entry
msg_someip_sd.add_ipv4_option(o_entry_1,30490,"192.168.0.2",True,False)
# add TCP IPv4 options index and assign first and assign first offer entry
msg_someip_sd.add_ipv4_option(o_entry_1,30501,"192.168.0.2",False,False)

# send message
msg_someip_sd.send()

Extracting SOME/IP-SD Option

  • In order to access the referenced options of an entry, one can use the "entry.options" attribute of an entry.

  • entry.options returns a list of options

from globals import *

# callback function, invoked on each incoming SOME/IP-SD packet
def on_msg_received(msg):
        for o in msg.get_offer_service_entries():
                print("offer service-id: {0}".format(hex(o.service_id)))
                # iterate over all referenced options
                for op in o.options:
                        print("option: {0} - port: {1}".format(op.ip_address, op.option_port))

# register callback function
msg_someip_sd.on_message_received += on_msg_received

# start live capturing for SOME/IP-SD packets
msg_someip_sd.start_capture(someip_sd_entries_extrahieren.TestProject.DirectoryName + "\\pcap\\someip_sd_multiple_options.pcap", True)

# de-register callback function
msg_someip_sd.on_message_received -= on_msg_received

# stop capturing
msg_someip_sd.stop_capture()

../_images/extractingoptions2.png

Deserializing Someip packet (payload simple parameters)

def on_msg_received(eth_msg):
        if eth_msg.has_layer(PROTOCOL_TYPE.IP):
                ip_layer = eth_msg.get_layer(PROTOCOL_TYPE.IP)
                ip_source = ip_layer.ip_address_source
                if eth_msg.has_layer(PROTOCOL_TYPE.SOMEIP) and not eth_msg.has_layer(PROTOCOL_TYPE.SOMEIP_SD):
                        msg_someip = eth_msg.get_someip_layer()
                        for msg in msg_someip.messages:
                                someip_layer = eth_msg.get_layer(PROTOCOL_TYPE.SOMEIP)
                                message_id = hex(someip_layer.someip_header.message_id).rstrip("L")
                                try:
                                        print("#  Trying to deserialize message_id: {0}".format(message_id))
                                        msg.data_base = G_FIBEX_DB
                                        dictionary_params = msg.get_input_parameters()

                                        for param in dictionary_params:
                                                if not param.is_complex and not param.is_array:   # SIMPLE NOT ARRAY
                                                        if param.is_enum:
                                                                print("Parameter: ", param.name, ": ", param.value, "(ENUM)")
                                                        else:
                                                                print("Parameter: ", param.name, ": ", param.value, "(", param.base_data, ")")

                                except Exception as e:
                                        print("[ERROR] Problem deserializing current packet")
                                        print("{0}".format(e))
        return False

Deserializing Someip packet (payload complex parameters)

def get_member(parameter, depth):
        if not parameter.is_complex:
                if parameter.is_enum:
                        return "*"*(depth -1) + ">{0} ({1}): {2} (ENUM)".format(parameter.name, parameter.base_data, parameter.value)
                else:
                        return "*"*(depth -1) + ">{0} ({1}): {2}".format(parameter.name, parameter.base_data, parameter.value)
        else:
                members_string = "*"*(depth -1)
                if depth > 1:
                        members_string += ">" + parameter.name + "< (complex)\n"
                for member in parameter.members:
                        members_string +=  get_member(member, depth+1) + "\n"
                return members_string


def on_msg_received(eth_msg):
        if eth_msg.has_layer(PROTOCOL_TYPE.IP):
                        ip_layer = eth_msg.get_layer(PROTOCOL_TYPE.IP)
                        ip_source = ip_layer.ip_address_source
                        if eth_msg.has_layer(PROTOCOL_TYPE.SOMEIP) and not eth_msg.has_layer(PROTOCOL_TYPE.SOMEIP_SD):
                                msg_someip = eth_msg.get_someip_layer()
                                for msg in msg_someip.messages:
                                        someip_layer = eth_msg.get_layer(PROTOCOL_TYPE.SOMEIP)
                                        message_id = hex(someip_layer.someip_header.message_id).rstrip("L")

                                        try:
                                                print("#  Trying to deserialize message_id: {0}".format(message_id))
                                                msg.data_base = G_FIBEX_DB
                                                dictionary_params = msg.get_input_parameters()

                                                for param in dictionary_params:
                                                        if param.is_complex:   # COMPLEX PARAM
                                                                print("Parameter: ", param.name, ": COMPLEX TYPE \n", get_member(param, 1))

                                        except Exception as e:
                                                print("[ERROR] Problem deserializing current packet")
                                                print("{0}".format(e))
        return False

Deserializing Someip packet (payload with array)

def get_member(parameter, depth):
        if not parameter.is_complex:
                if parameter.is_enum:
                        return "*"*(depth -1) + ">{0} ({1}): {2} (ENUM)".format(parameter.name, parameter.base_data, parameter.value)
                else:
                        return "*"*(depth -1) + ">{0} ({1}): {2}".format(parameter.name, parameter.base_data, parameter.value)
        else:
                members_string = "*"*(depth -1)
                if depth > 1:
                        members_string += ">" + parameter.name + "< (complex)\n"
                for member in parameter.members:
                        members_string +=  get_member(member, depth+1) + "\n"
                return members_string



def get_array_members(array_complex):
        members_string = ""
        for array_element in array_complex:
                for el in array_element.members_objects:
                        parameter = el
                        if not parameter.is_complex:
                                if parameter.is_enum:
                                        members_string += "{0} (ENUM)".format(param.value)
                                else:
                                        members_string += "{0} ({1})".format(param.value, param.type)
                        else:

                                for member in el.members:
                                        members_string += get_member(member)
        return members_string




def on_msg_received(eth_msg):
        if eth_msg.has_layer(PROTOCOL_TYPE.IP):
                        ip_layer = eth_msg.get_layer(PROTOCOL_TYPE.IP)
                        ip_source = ip_layer.ip_address_source
                        if eth_msg.has_layer(PROTOCOL_TYPE.SOMEIP) and not eth_msg.has_layer(PROTOCOL_TYPE.SOMEIP_SD):
                                msg_someip = eth_msg.get_someip_layer()
                                for msg in msg_someip.messages:
                                        someip_layer = eth_msg.get_layer(PROTOCOL_TYPE.SOMEIP)
                                        message_id = hex(someip_layer.someip_header.message_id).rstrip("L")

                                        try:
                                                print("#  Trying to deserialize message_id: {0}".format(message_id))
                                                msg.data_base = G_FIBEX_DB
                                                dictionary_params = msg.get_input_parameters()

                                                for param in dictionary_params:
                                                        if param.is_array:   #
                                                                if param.is_complex:   # ARRAY COMPLEX
                                                                        print("Parameter: ", param.name, ": (ARRAY COMPLEX) ", len(param.table_complex), " elements")
                                                                        for el in param.table_complex:
                                                                                for member in el.members:
                                                                                        print(get_member(member.Value.parameter, 1))
                                                                else:                  # ARRAY SIMPLE
                                                                        print("Parameter: ", param.name, ": (ARRAY SIMPLE)")
                                                                        for array_elem in param.arrays:
                                                                                for num in range(array_elem.num_elements):
                                                                                        print("* dimension: {0}, elem: {1} -> {2} ".format(array_elem.dimension, num, param.value[array_elem.dimension - 1 + num]))

                                        except Exception as e:
                                                print("[ERROR] Problem deserializing current packet")
                                                print("{0}".format(e))
        return False

Storing received data to a pcap file

from globals import *
import time

def on_time_elapsed(t,a):
        # set icmp request message
        msg_icmp = message_builder.create_icmp_message()
        msg_icmp.type_code =ICMPv4TypeCodes1.EchoRequest
        msg_icmp.ethernet_header.mac_address_source = "00:11:22:33:44:66"
        msg_icmp.ethernet_header.mac_address_destination = "00:11:22:33:44:55"
        msg_icmp.ip_header.ip_address_source = "192.168.2.10"
        msg_icmp.ip_header.ip_address_destination = "192.168.2.11"
        msg_icmp.send()

timer_1.on_time_elapsed += on_time_elapsed
adapter0.start_record(adapter_store_pcap.TestProject.DirectoryName + "\\pcap\\store_pcap_example.pcap")
timer_1.start()

time.sleep(5)
timer_1.on_time_elapsed -= on_time_elapsed
timer_1.stop()
adapter0.stop_record()

Creating messages using message_builder

  • Message objects can be created dynamically

  • For this purpose, the message_builder must be used.

  • There are two types of signatures:

  1. Passing the sender and receiver adapter

  2. Without any parameters. In this case, the default adapter is used as sender and the receiver as adapter

  • Properties of the created message objects can be modified by using the API

message_builder.create_1722_message()
message_builder.create_arp_message()
message_builder.create_dhcp_message()
message_builder.create_ethernet_message()
message_builder.create_hsfz_message()
message_builder.create_icmp_message()
message_builder.create_ip_message()
message_builder.create_ptp_message()
message_builder.create_someip_message()
message_builder.create_someip_sd_message()
message_builder.create_tcp_message()
message_builder.create_tftp_message()
message_builder.create_udp_message()
message_builder.create_udp_nm_message()

Accessing properties of different layers

  • Setting and getting properties of a specific protocol layer requires the selection of the specific protocol layer even if it is the property of the protocol layer of the message.

  • Such as: setting the unicast flag of a SOME/IP-SD message.

msg_someip_sd = message_builder.create_someip_sd_message()
msg_someip_sd.ethernet_header.mac_address_source = "00:11:22:33:44:55"
msg_someip_sd.vlan_tag.vlan_priority_tag = 0x01
msg_someip_sd.ip_header.ip_address_destination ="192.168.1.3"
msg_someip_sd.transport_header.port_destination = 1234
msg_someip_sd.someip_header.client_id = 0x1234

# Select SOME/IP-SD header in order to set unicast_flag to 0x01
msg_someip_sd.someip_sd_header.unicast_flag = 0x01

Message object responding machine

from globals import *
import time

def on_time_elapsed(t,a):

        # Create Ping Request
        msg_icmp.type_code = ICMPv4TypeCodes1.EchoRequest
        msg_icmp.ethernet_header.mac_address_source = "00:11:22:33:44:66"
        msg_icmp.ethernet_header.mac_address_destination= "00:11:22:33:44:55"
        msg_icmp.ip_header.ip_address_source = "192.168.2.10"
        msg_icmp.ip_header.ip_address_destination= "192.168.2.11"
        msg_icmp.send()

timer_1.on_time_elapsed += on_time_elapsed

def is_request(msg_src, msg_received):
        if msg_received.ip_header.ip_address_destination == "192.168.2.11" and msg_received.type_code == ICMPv4TypeCodes1.EchoRequest:
                return True
        return False

def make_reply(msg_src, msg_received):

        # Create Ping Reply
        msg_src.type_code = ICMPv4TypeCodes1.EchoReply
        msg_src.ip_header.ip_address_source = msg_received.ip_header.ip_address_destination
        msg_src.ip_header.ip_address_destination = msg_received.ip_header.ip_address_source
        msg_src.ethernet_header.mac_address_source = msg_received.ethernet_header.mac_address_destination
        msg_src.ethernet_header.mac_address_destination = msg_received.ethernet_header.mac_address_source
        msg_src.send()


msg_icmp= message_builder.create_icmp_message("channel_1", "channel_1")

# Register Callback Function
msg_icmp.is_request += is_request
msg_icmp.make_reply += make_reply

# Starting Responding Machine Functionality
msg_icmp.start_responding_machine()
timer_1.start()

time.sleep(5)

timer_1.stop()
msg_icmp.stop_responding_machine()
msg_icmp.is_request -= is_request
msg_icmp.make_reply -= make_reply

Handling of Fibex

  • ANDi provides methods for:

  1. Getting information out of Fibex files:

  1. ECUs

  2. Consumed and provided services depending on: ECU Id, ECU name, IP, Instance Ids, Etc.

  3. Events, Eventgroups and Fields

  1. Creating Entries: Offers, Subscribes, etc.

  • Returned objects have in turn methods and properties to get further information

# Only a small extract of the whole list
# Use the autocompletion or the ANDi help to get all possibilities

database_1.get_all_ecus()
database_1.get_all_services()
database_1.get_consumed_event_groups(service_id,"ecu_name")
database_1.get_consumed_event_groups_by_ip(service_id,"ecu_name","ip")
database_1.get_consumed_instance_ids(service_id,"ecu_name")
database_1.get_consumed_instance_ids_by_ip(service_id,"ecu_name","ip")
database_1.get_consumed_services("ecu_name",service_id)
database_1.get_consumed_services_by_ecu_id("ECUID")
database_1.get_consumed_services_by_ecu_ip("ecu_name","ip")
database_1.get_consumed_services_by_ecu_name("ECUNAME")
database_1.get_consumed_services_by_ip("ecu_name","ip",service_id)
database_1.get_ecus_by_ip("ip")
database_1.get_ecu_by_identifier("id")
database_1.get_ecu_by_name("name")
database_1.get_eventgroups_by_service(ServiceID)

Loading Fibex

  • One project can load and make use of several Fibex files

  • Properties of the Fibex file can be inspected and changed in the Properties View: Name,File path

  • After ANDi has parsed the Fibex files completely, all contained ECUs and provided and consumed services can be viewed in the Messages View

  • Scripts can make use of the loaded Fibex files using the provided API and referencing the Fibex files via the specified name

    ../_images/fibexview.png

Getting consumed and provided services

from globals import *

dict_consumed_and_provided_services = {}

# Get all ECUs contained in fibex
for e in database_1.get_all_ecus():
        key = (e.ecu_name, e.ecu_id)

        # Insert key, if not already contained
        if key not in dict_consumed_and_provided_services:
                dict_consumed_and_provided_services[key] = {}

        # Get provided services
        dict_provided_services = {}
        for p in e.provided_services:
                service = p.service
                # Store services using service_id as key and service name as value
                dict_provided_services[service.service_id] = service.name


        # Get consumed services
        dict_consumed_services = {}
        for c in e.consumed_services:
                service = c.service
                # Store services using service_id as key and service name as value
                dict_consumed_services[service.service_id] = service.name

        # Store information
        dict_consumed_and_provided_services[key]["provided"] = dict_provided_services
        dict_consumed_and_provided_services[key]["consumed"] = dict_consumed_services

# Iterate over all ECUs stored previously
for ecu_name, ecu_id in dict_consumed_and_provided_services:
        #Calculate amount of services
        amount_provided = len(dict_consumed_and_provided_services[(ecu_name, ecu_id)]["provided"])
        amount_consumed = len(dict_consumed_and_provided_services[(ecu_name, ecu_id)]["consumed"])
        print("ecu name: {0}, provided: {1}, consumed: {2}".format(ecu_name, amount_provided, amount_consumed))

Accessing methods, fields, events and eventgroups

  • Services can contain methods, events, fields and eventgroups, whereas events and fields can be referenced by an eventgroup

  • Thus, those information can only be accessed via the service

from globals import *

# get all consumed services by 'ECUName'
for s in database_1.get_consumed_services_by_ecu_name("ECUName"):
        # iterate over all event groups and print its associated events and fields
        for eg in s.service.event_groups:
                for e in eg.events:
                        print("event: {0}".format(e))

                for f in eg.fields:
                        print("field: {0}".format(f))

        # iterate over all methods of the service interface and print it out
        if s.service.methods:
                for m in s.service.methods:
                        print("method: {0}".format(m))

        # all events defined in the service interface
        if s.service.events:
                for e in s.service.events:
                        print("event in si: {0}".format(e))
  • Further access to the properties of methods, fields, events and eventgroups is possible.

Accessing Input and Output parameters of a method

  • Getting the Input and Output parameters of a method is done using input_parameters respectively output_parameters

  • Those properties return a list of parameter objects

from globals import *

# get all consumed services by 'ECUName'
for s in database_1.get_consumed_services_by_ecu_name("ECUName"):
        if s.service.methods:
                for m in s.service.methods:
                        print("method: {0}".format(m))
                        if m.input_parameters:
                                print("inputs")
                                for i in m.input_parameters:
                                        print(i)
                        if m.return_parameters:
                                print("outputs")
                                for i in m.return_parameters:
                                        print(i)

Sending diagnose messages

# --- Imports

from time import sleep

# --- Constants

## HSFZ port
HSFZ_PORT_SRC = 7811

HSFZ_PORT_DST = 6811

## the ECU diagnose source address
DIAG_SOURCE_ADDR = 0xF4

## the ECU diagnose target address
DIAG_TARGET_ADDR = 0x10

_src_ip =  sender.get_ip()
print("sender ip : {}".format(sender.get_ip()))
_src_mac = sender.get_mac()
print("sender mac : {}".format(sender.get_mac()))
_dst_ip = receiver.get_ip()
print("receiver ip : {}".format(receiver.get_ip()))
_dst_mac = receiver.get_mac()
print("receiver mac : {}".format(receiver.get_mac()))

# --- Global variables

my_hsfz_msg_received = False

## Message used to send the HSFZ connect and disconnect requests


def on_msg_received(msg):
        global my_hsfz_msg_received
        print("message received ----------------------------")
        print(msg.ToString())
        if msg.ip_header.ip_address_source == _src_ip and\
           msg.ip_header.ip_address_destination == _dst_ip:
                my_hsfz_msg_received = True
                print(msg.hsfz_payload)
        print("---------------------------------------------")

##################################################################
#                       TEST CASE START                          #
##################################################################

######TEST BODY

# --- Pre initializations
hsfz_msg = message_builder.create_hsfz_message("sender","sender")
msg_hsfz_listener = message_builder.create_hsfz_message("sender","sender")

if hsfz_msg:
        msg_hsfz_listener.ip_header.ip_address_destination = _src_ip
        msg_hsfz_listener.on_message_received += on_msg_received
        #create listener for TCP clients through the receiver network adapter.
        msg_hsfz_listener.start_listener()
        print("Server OK")

        if not hsfz_msg.connected:
                # Set source and destination IP addresses
                hsfz_msg.ip_header.ip_address_source = _src_ip
                hsfz_msg.ip_header.ip_address_destination = _dst_ip
                # Set ports
                hsfz_msg.transport_header.port_source = HSFZ_PORT_SRC
                hsfz_msg.transport_header.port_destination = HSFZ_PORT_DST
                # Set control word
                hsfz_msg.ctr_word = HSFZCtrlWordMapping.CTRLWORD_REQUEST_RESPONSE
                # Set diag addresses
                hsfz_msg.diag.source_address = DIAG_SOURCE_ADDR
                hsfz_msg.diag.target_address = DIAG_TARGET_ADDR
                # Set payload (SVK_LESEN = "F1 01")
                payload = System.Array[Byte](bytearray.fromhex("F1 01"))
                hsfz_msg.diagnostic_payload_data = payload
                # Create conennexion through the sender
                hsfz_msg.connect()
                print("Client Connected")
                print(hsfz_msg.get_hex_bytes())
                # Send HSFZ message
                hsfz_msg.send()
                print("Message sent")
                sleep (2)

                #***************************************************#
                msg_hsfz_listener.stop_listener()
                if my_hsfz_msg_received:
                        tc_return_success('My hsfz msg received')
                else:
                        tc_return_failure('My hsfz msg not received')

else:
        tc_return_failure('Failed creating hsfz message')

SOME/IP get_input_param and set_input_param

channel = "eth"

# create and configure someip message
someip_msg = message_builder.create_someip_message(channel, channel)
someip_msg.someip_header.service_identifier = service_id
someip_msg.someip_header.method_identifier = method_id

# link to the correspondent database
someip_msg.data_base = data_base

# set input parameter inBOOL to 1
someip_msg.set_input_param("inBOOL", 0x01)

# get input parameter inBOOL
param = someip_msg.get_input_param("inBOOL")

# prints 1
print(param)

# set input parameter inBOOL to 5
someip_msg.set_input_param("inBOOL", 5)

# prints 5
print(someip_msg.get_input_param("inBOOL"))