Using Open/Close Writer in ANDi scripts

Open Writer

The function open_writer(file, fileFormat) creates a PCAP or PCAPNG file to write the received messages. If the file already exists, it will be overwritten (old file data will be lost). Use the store() method to record received messages into the file.

Note

  • open_writer method supports pcap, pcapng and asc file formats.

Close Writer

The function close_writer() closes the file previously created with open_writer function.

Sample script

from datetime import datetime

G_TIMEOUT_MS = 500

# Callback function, invoked on each incoming SOME/IP message
def on_msg_received(msg):
    msg.store()
    
def get_log_file_name(file_path):
    # Get path till the folder containing the test case file
    trace_file_path = "\\".join(file_path.split("\\")[:-3])
    # Get test case name
    testcase_name = file_path.split("\\")[-1].split(".")[0]
    # Get timestamp
    exec_time = datetime.now().strftime('%Y%m%d%H%M%S')
    # Generate unique trace file name
    log_file = "{0}\\Logs\\{1}\\{2}\\{3}_{1}".format(trace_file_path, "ANDi", exec_time, testcase_name)
    return log_file

# Create SOME/IP message
someip_msg = message_builder.create_someip_message("channel_1" , "channel_1")

save_path = get_log_file_name(file_path) + ".pcap"

# Register callback function
someip_msg.on_message_received += on_msg_received

# Start live capturing for SOME/IP message
someip_msg.start_capture()

# Create a PCAP file to write the received messages
someip_msg.open_writer(save_path)

# Send messages
for i in range(0,5):
    someip_msg.send()

# Block the test case execution until the specified timeout is elapsed
tc_wait_for_return(G_TIMEOUT_MS)

# Close the previously created file
someip_msg.close_writer()

# Stop capturing
someip_msg.stop_capture()

# De-register callback function
someip_msg.on_message_received -= on_msg_received 

Sample script for using Writer with asc files

from time import sleep

# the callback to be added to on_message_received.
def on_msg_rec(msg):
    # will store the can message to asc file.
    msg.store()

# create a can message.
message = message_builder.create_can_message(sender_channel.name, receiver_channel.name)

# open writer to specific path.
message.open_writer("Path_to_file.asc")

# add on_message_received callback.
message.on_message_received += on_msg_rec

# start capturing incoming can messages.
message.start_capture()

sleep(60)

# after getting the needed messages, stop capturing.
message.stop_capture()
# close the writer before finishing the script.
message.close_writer()

# remove the callback from the on_message_received event.
message.on_message_received -= on_msg_rec

Sample script for capturing mixed CAN and LIN traffic to asc file

from time import sleep

# the callback to be added to on_message_received.
def on_msg_rec(msg):
    # will store the can or lin message to asc file.
    can = msg.get_can_layer()
    if (can):
        can.store()
    lin = msg.get_lin_layer()
    if (lin): 
        lin.store()

# create a can message.
message = message_builder.create_ethernet_message(sender_channel.name, receiver_channel.name)

# open writer to specific path.
message.open_writer("Path_to_file.asc")

# add on_message_received callback.
message.on_message_received += on_msg_rec

# start capturing incoming can messages.
message.start_capture()

sleep(60)

# after getting the needed messages, stop capturing.
message.stop_capture()
# close the writer before finishing the script.
message.close_writer()

# remove the callback from the on_message_received event.
message.on_message_received -= on_msg_rec