Using Open/Close Writer in ANDi scripts
Contents
Open Writer
The function open_writer(file, fileFormat)
creates a PCAP or PCAPNG file to write the received messages. If the file already exists, it will be overwritten (old file data will be lost). Use the store()
method to record received messages into the file.
Note
open_writer
method supportspcap
,pcapng
andasc
file formats.
Close Writer
The function close_writer()
closes the file previously created with open_writer
function.
Sample script
from datetime import datetime
G_TIMEOUT_MS = 500
# Callback function, invoked on each incoming SOME/IP message
def on_msg_received(msg):
msg.store()
def get_log_file_name(file_path):
# Get path till the folder containing the test case file
trace_file_path = "\\".join(file_path.split("\\")[:-3])
# Get test case name
testcase_name = file_path.split("\\")[-1].split(".")[0]
# Get timestamp
exec_time = datetime.now().strftime('%Y%m%d%H%M%S')
# Generate unique trace file name
log_file = "{0}\\Logs\\{1}\\{2}\\{3}_{1}".format(trace_file_path, "ANDi", exec_time, testcase_name)
return log_file
# Create SOME/IP message
someip_msg = message_builder.create_someip_message("channel_1" , "channel_1")
save_path = get_log_file_name(file_path) + ".pcap"
# Register callback function
someip_msg.on_message_received += on_msg_received
# Start live capturing for SOME/IP message
someip_msg.start_capture()
# Create a PCAP file to write the received messages
someip_msg.open_writer(save_path)
# Send messages
for i in range(0,5):
someip_msg.send()
# Block the test case execution until the specified timeout is elapsed
tc_wait_for_return(G_TIMEOUT_MS)
# Close the previously created file
someip_msg.close_writer()
# Stop capturing
someip_msg.stop_capture()
# De-register callback function
someip_msg.on_message_received -= on_msg_received
Sample script for using Writer with asc files
from time import sleep
# the callback to be added to on_message_received.
def on_msg_rec(msg):
# will store the can message to asc file.
msg.store()
# create a can message.
message = message_builder.create_can_message(sender_channel.name, receiver_channel.name)
# open writer to specific path.
message.open_writer("Path_to_file.asc")
# add on_message_received callback.
message.on_message_received += on_msg_rec
# start capturing incoming can messages.
message.start_capture()
sleep(60)
# after getting the needed messages, stop capturing.
message.stop_capture()
# close the writer before finishing the script.
message.close_writer()
# remove the callback from the on_message_received event.
message.on_message_received -= on_msg_rec
Sample script for capturing mixed CAN and LIN traffic to asc file
from time import sleep
# the callback to be added to on_message_received.
def on_msg_rec(msg):
# will store the can or lin message to asc file.
can = msg.get_can_layer()
if (can):
can.store()
lin = msg.get_lin_layer()
if (lin):
lin.store()
# create a can message.
message = message_builder.create_ethernet_message(sender_channel.name, receiver_channel.name)
# open writer to specific path.
message.open_writer("Path_to_file.asc")
# add on_message_received callback.
message.on_message_received += on_msg_rec
# start capturing incoming can messages.
message.start_capture()
sleep(60)
# after getting the needed messages, stop capturing.
message.stop_capture()
# close the writer before finishing the script.
message.close_writer()
# remove the callback from the on_message_received event.
message.on_message_received -= on_msg_rec