TCP Connections
There are several ways to create sockets:
Python
.Net
ANDi TCP message object
Caution with disconnect:
Windows does not close a socket immediately. Thus, a
disconnect()
followed by aconnect()
to the same port can lead to an error because the socket has been already in use until windows releases it.ANDi tries to avoid those errors. When
msg_tcp.connect()
is called, ANDi tries to reuse a socket. Furthermore, if reusing is not possible, ANDi tries for a certain amount of time to establish a connection and catches the exceptions. If anyone wants to have a new socket connection at all costs thenmsg_tcp.connect(True)
can be used.
TCP Server
try:
tcp_msg = message_builder.create_tcp_message("sender", "receiver")
tcp_msg.ip_header.ip_address_source = "192.168.0.50"
tcp_msg.transport_header.port_source = 65535
tcp_msg.start_listener()
except Exception as e:
print("{0}".format(e))
TCP Client
try:
# Create SD message
tcp_msg = message_builder.create_tcp_message(adapter0.name, adapter0.name)
# Set destination MAC and IP
tcp_msg.ethernet_header.mac_address_destination = "00:11:22:33:44:66"
tcp_msg.ip_header.ip_address_destination = "192.168.2.11"
# Set source port and destination port
tcp_msg.transport_header.port_source = 49152
tcp_msg.transport_header.port_destination = 30501
# connect to server
tcp_msg.connect()
sleep(0.5)
if tcp_msg.connected:
print("connection established")
else:
print("TCP connection attempt to IP address {0} and Port {1} refused")
print("connection not established")
except Exception as e:
print("{0}".format(e))
print("connection not established")
Diagnosis
One can use TCP message objects for sending diagnosis messages, but in this case the encoding and decoding of the payload must be done manually compliant to the UDS protocol
Thus, it is better to use HSFZ message objects
The general approach is as follows:
Create HSFZ message object
Set IP addresses (src, dst)
Set ports (src, dst)
Set diagnosis message using
msg_diag.diagnostic_payload_data
Perform connection using
msg_diag.connect()
Reception is done via
on_msg_received(msg)
Perform
Disconnect()
to close the connection
ECU reset
from globals import *
from time import sleep
reset_success = False
diag_command = [0x11,0x01] # Hard reset
# create a diagnose message for resetting the ECU using hsfz extern
def create_diagnosis_message():
diag_msg = message_builder.create_hsfz_message(adapter0.name, adapter0.name)
diag_msg.ip_header.ip_address_source = adapter0.get_ip()
# TODO: set destination ip and mac
# Set ports
diag_msg.transport_header.port_source = 6801
diag_msg.transport_header.port_destination = 6801
# Set control word
diag_msg.ctr_word = HSFZCtrlWordMapping.CTRLWORD_REQUEST_RESPONSE
# Set diag addresses
diag_msg.diag.source_address = 0xF4
diag_msg.diag.target_address = 0x06
return diag_msg
def on_diag_msg_received(hsfz_msg):
global reset_success
if hsfz_msg.ctr_word == HSFZCtrlWordMapping.CTRLWORD_REQUEST_RESPONSE:
response_payload = bytearray(hsfz_msg.diagnostic_payload_data)
if response_payload != None and response_payload[0] == diag_command[0] + 0x40 and response_payload[1] == diag_command[1]:
rest_success = True
# Create diag message
diag_response = create_diagnosis_message()
if diag_response:
try:
# Assign payload
diag_response.diagnostic_payload_data = System.Array[Byte](diag_command)
diag_response.on_message_received += on_diag_msg_received
diag_response.connect()
diag_response.send()
max_wait = 5
# Wait for max. 1,5 seconds for an answer
while max_wait > 0 and not reset_success:
max_wait -= 0.3
sleep(0.3)
except Exception as e:
print("Exception occured when resetting ECU")
finally:
diag_response.on_message_received -= on_diag_msg_received
SOME/IP over TCP example
# --- Constants
SERVICE_ID = 0x0101
METHOD_ID = 0x0002
MESSAGE_ID = ((SERVICE_ID<<16) | METHOD_ID)
CLIENT_ID = 0x3245
SESSION_ID = 0x3789
REQUEST_ID = ((CLIENT_ID<<16) | SESSION_ID)
PROTOCOL_VERSION = 0x01
INTERFACE_VERSION = 0x01
MESSAGE_TYPE = MessageType.REQUEST
RETURN_CODE = ReturnCode.E_OK
SOURCE_PORT = 30491
DESTINATION_PORT = 30492
SRC_IP = "192.168.0.1"
DST_IP = "192.168.0.2"
DST_MAC = "11:22:33:44:55:66"
SRC_MAC = "11:22:33:44:55:77"
G_TIMEOUT_MS = 1000
# --- Global variables
someip_msg = None
my_someip_msg_received = None
my_someip_msg_eth_received = False
## Set to save all the error messages found
set_errors = set()
# --- Functions
## Shows the errors founds if any and determine the test result
def tc_get_result():
print("-------------------------------------------------------------------")
print("----------------------- RESULT ----------------------------------")
print('EXPECTED: SOMEIP message is received succesfully over TCP and the '\
'connection is open/closed properly ')
if my_someip_msg_received:
if my_someip_msg_eth_received:
if set_errors:
for error in set_errors:
print(error)
tc_return_failure('** Failure: SOME/IP message was not correctly,' \
'received')
else:
tc_return_success('** Success: OK. SOME/IP message received exactly, '\
'as it was sent')
else:
tc_return_failure('** Failure: No SOME/IP message received')
else:
tc_return_failure('** Failure: No SOME/IP message received')
def tc_stop_listening(tcp_msg):
tcp_msg.stop_listener()
def tc_clear(msg, call_back_function):
msg.stop_capture()
msg.on_message_received -= call_back_function
return
def tc_prepare_listening(someip_msg, call_back_function):
someip_msg.on_message_received += call_back_function
someip_msg
someip_msg.start_listener()
return
def tc_prepare_capture(msg, call_back_function):
msg.on_message_received += call_back_function
msg.start_capture()
return
def on_msg_eth_received(msg):
global my_someip_msg_eth_received
if msg.has_layer(PROTOCOL_TYPE.TCP):
if msg.has_layer(PROTOCOL_TYPE.SOMEIP):
someip_tcp = msg.get_layer(PROTOCOL_TYPE.SOMEIP)
if check_port(someip_tcp, SOURCE_PORT, DESTINATION_PORT):
my_someip_msg_eth_received = True
def on_msg_received(msg):
global my_someip_msg_received, set_errors
if check_ip_adress(msg):
if not my_someip_msg_received:
my_someip_msg_received = msg
set_errors = set_errors | check_someip_header(someip_msg, my_someip_msg_received, MESSAGE_ID, SERVICE_ID, METHOD_ID, REQUEST_ID, CLIENT_ID, SESSION_ID, PROTOCOL_VERSION, INTERFACE_VERSION, MESSAGE_TYPE, RETURN_CODE)
tc_return_continue()
def check_port(msg, src_port, dst_port):
if msg.transport_header.port_source == src_port and \
msg.transport_header.port_destination == dst_port:
return True
return False
def check_someip_header(msg, MESSAGE_ID, SERVICE_ID, METHOD_ID, LENGTH, REQUEST_ID, CLIENT_ID, SESSION_ID, PROTOCOL_VERSION, INTERFACE_VERSION,MESSAGE_TYPE, RETURN_CODE):
set_errors = set()
if msg.someip_header.message_id != MESSAGE_ID:
set_errors.add("wrong message id received 0x{0}, valid message id is 0x{1}".format(msg.someip_header.message_id, MESSAGE_ID))
if msg.someip_header.service_identifier != SERVICE_ID:
set_errors.add("wrong service id received 0x{0}, valid service id is 0x{1}".format(msg.someip_header.service_identifier, SERVICE_ID))
if msg.someip_header.method_identifier != METHOD_ID:
set_errors.add("wrong method id received 0x{0}, valid method id is 0x{1}".format(msg.someip_header.method_identifier , METHOD_ID))
if msg.someip_header.length != LENGTH:
set_errors.add("wrong length received 0x{0}, valid length is 0x{1}".format( msg.someip_header.length,LENGTH))
if msg.someip_header.request_id != REQUEST_ID:
set_errors.add("wrong request id received 0x{0}, valid request id is 0x{1}".format(msg.someip_header.request_id , REQUEST_ID))
if msg.someip_header.client_id != CLIENT_ID:
set_errors.add("wrong client id received 0x{0}, valid client id calculated is 0x{1}".format(msg.someip_header.client_id ,CLIENT_ID))
if msg.someip_header.session_id != SESSION_ID:
set_errors.add("wrong session id received 0x{0}, valid session id is 0x{1}".format(msg.someip_header.session_id , SESSION_ID))
if msg.someip_header.protocol_version != PROTOCOL_VERSION:
set_errors.add("wrong protocol version received 0x{0}, valid protocol version is 0x{1}".format(msg.someip_header.protocol_version , PROTOCOL_VERSION))
if msg.someip_header.interface_version != INTERFACE_VERSION:
set_errors.add("wrong interface version received 0x{0}, valid interface version is 0x{1}".format(msg.someip_header.interface_version , INTERFACE_VERSION))
if int(msg.someip_header.message_type.GetHashCode()) != MESSAGE_TYPE:
set_errors.add("wrong message type received 0x{0}, valid messages type is 0x{1}".format(msg.someip_header.message_type.GetHashCode(),MESSAGE_TYPE))
if int(msg.someip_header.return_code.GetHashCode()) != RETURN_CODE:
set_errors.add("wrong return code received 0x{0}, valid return code is 0x{1}".format(msg.someip_header.return_code.GetHashCode() , RETURN_CODE))
return set_errors
def check_ip_adress(msg):
if msg.ip_header.ip_address_destination == DST_IP and \
msg.ip_header.ip_address_source == SRC_IP:
return True
def initialize_someip_ip_header(someip_msg):
someip_msg.ip_header.ip_address_destination = DST_IP
someip_msg.ip_header.ip_address_source = SRC_IP
return
def initialize_tcp_connection(msg, src_ip, dst_ip, src_port, dst_port):
msg.ip_header.ip_address_source = src_ip
msg.ip_header.ip_address_destination = dst_ip
msg.transport_header.port_source = src_port
msg.transport_header.port_destination = dst_port
def initialize_client(someip_msg):
initialize_tcp_connection(someip_msg, SRC_IP, DST_IP, SOURCE_PORT,
DESTINATION_PORT)
initialize_someip_message(someip_msg, SERVICE_ID ,METHOD_ID, CLIENT_ID,
SESSION_ID, PROTOCOL_VERSION, INTERFACE_VERSION,
MESSAGE_TYPE, RETURN_CODE)
def initialize_someip_message_header(someip_msg):
initialize_someip_ethernet_header(someip_msg)
initialize_someip_ip_header(someip_msg)
return
def initialize_someip_ethernet_header(someip_msg):
someip_msg.ethernet_header.mac_address_destination = DST_MAC
someip_msg.ethernet_header.mac_address_source = SRC_MAC
return
def initialize_someip_message(someip_msg, service_id ,method_id,
client_id, session_id, protocol_version,
interface_version, message_type, return_code):
initialize_someip_message_header(someip_msg)
initialize_someip_header(someip_msg, service_id ,method_id,
client_id, session_id, protocol_version,
interface_version, message_type, return_code)
return
def initialize_someip_header(someip_msg, service_id ,method_id, client_id, session_id,
protocol_version, interface_version, message_type,
return_code):
someip_msg.someip_header.service_identifier = service_id
someip_msg.someip_header.method_identifier = method_id
someip_msg.someip_header.client_id = client_id
someip_msg.someip_header.session_id = session_id
someip_msg.someip_header.protocol_version = protocol_version
someip_msg.someip_header.interface_version = interface_version
someip_msg.someip_header.message_type = message_type
someip_msg.someip_header.return_code = return_code
return
def initialize_server(someip_msg):
initialize_tcp_connection(someip_msg, DST_IP, SRC_IP, DESTINATION_PORT,
SOURCE_PORT)
# @param file_path Absolute path of the test case that is starting
# @param ecu_name Name of the ECU under test. Only needed when testing a specific ECU
def tc_start(file_path, ecu_name="ANDi"):
global log_file_name
# Assume that the ECU to test is active and then check if it is really true
ecu_active = True
#outputs.clear()
testcase_name = file_path.split("\\")[-1].split(".")[0]
print("TEST CASE << {0} >>".format(testcase_name.upper()))
print("**************************************************************")
# Start PCAP recording
log_file_name = file_path + "Log_File"
print("Recording trace to the following file: {0}.pcap".format(log_file_name))
channel_1.start_record("{0}.pcap".format(log_file_name))
if ecu_name == "ANDi":
print("Starting test to check messages from/to ANDi tool...")
print("**************************************************************")
return ecu_active
# --- Pre initializations
someip_msg = message_builder.create_someip_message("channel_1","channel_1")
someip_msg.protocol = Protocols.TCP
l_someip_msg = message_builder.create_someip_message("channel_1","channel_1")
l_someip_msg.protocol = Protocols.TCP
l_eth_msg = message_builder.create_ethernet_message("channel_1","channel_1")
## Call function to Start capturing a PCAP and create a txt file.
tc_start(someip_msg.TestProject.DirectoryName + "\\pcap\\")
# --- Test steps
if someip_msg:
try:
tc_prepare_capture(l_eth_msg, on_msg_eth_received)
print("Initialize client...")
initialize_client(someip_msg)
print("Initialize server...")
initialize_server(l_someip_msg)
print("Someip server starting...")
tc_prepare_listening(l_someip_msg, on_msg_received)
if not someip_msg.connected:
print("Establishing connection... ")
someip_msg.connect()
print("Sending SOME/IP message... ")
someip_msg.send()
print("Verify SOME/IP message... ")
tc_wait_for_return(G_TIMEOUT_MS)
print("Deconnection... ")
someip_msg.disconnect()
tc_stop_listening(l_someip_msg)
except Exception as e:
print(type(e).__name__)
tc_get_result()
# Stop teststeps and clear memory
tc_clear(someip_msg, on_msg_received)
tc_clear(l_eth_msg, on_msg_eth_received)
else:
tc_return_failure('Failed creating Someip message')