[CAN-FD] Vehicle Diagnostic Communication Part 94 [ISO-TP 2]

[CAN-FD] Vehicle Diagnostic Communication Part 94 [ISO-TP 2] 車両診断通信
[CAN-FD] Vehicle Diagnostic Communication Part 94 [ISO-TP 2]

Click here for back issues.
https://www.simulationroom999.com/blog/diagnostic-communication-en-back-issue/

Introduction.

Explanation for CAN-FD simulation with can-isotp.
This article will write the Python code to transmit and receive.

Python code for CAN-FD request side can-isotp

Let’s write the code as soon as possible.

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0, 
         'blocksize' : 0,
         'wftmax' : 0,
         'll_data_length' : 64,
         'tx_padding' : 0x55,
         'rx_flowcontrol_timeout' : 1000,
         'rx_consecutive_frame_timeout' : 1000,
         'squash_stmin_requirement' : False,
         'can_fd' : True,
         'tx_data_min_length' : 8
      }
      self.exit_requested = False
      #self.bus = VectorBus(channel='0', bitrate=500000)
      self.bus = VectorBus(channel='0', bitrate=500000, fd=True)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0xF1, target_address=0x10) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         #time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
         time.sleep(0.001) # Variable sleep time based on state machine state

   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def sendrecv( app, msg ):
   print("Send msg : %s" % (msg.hex()))
   app.send(msg)
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         payload = app.stack.recv()
         print("Recv msg : %s" % (payload.hex()))
         break
      time.sleep(0.2)


if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07')
   
   # 60byteメッセージ
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10 … \x09\x10')
   
   # 120byteメッセージ
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10 … \x09\x10')
   
   # 5000byteメッセージ
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10 … \x09\x10')

   print("Exiting")
   app.shutdown()

Python code explanation for CAN-FD request side can-isotp

For now, 64 bytes of DLC. like CAN-FD enabled.
And we can see the following operation.

  • SingleFrame of 7 bytes or less
  • SingleFrame of 8 bytes or more
  • MultiFrame of 4095byte or less
  • MultiFrame of 4096byte or more

In other words, we are just focusing on the points that have changed since CAN-FD.

Python code for CAN-FD response side can-isotp

In contrast, the response side is the following code.

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0,                          # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
         'blocksize' : 0,                       # Request the sender to send 8 consecutives frames before sending a new flow control message
         'wftmax' : 0,                          # Number of wait frame allowed before triggering an error
         #'ll_data_length' : 8,                  # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
         'll_data_length' : 16,                  # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
         'tx_padding' : 0xCC,                      # Will pad all transmitted CAN messages with byte 0x00. None means no padding
         'rx_flowcontrol_timeout' : 1000,        # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
         'rx_consecutive_frame_timeout' : 1000, # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
         'squash_stmin_requirement' : False,     # When sending, respect the stmin requirement of the receiver. If set to True, go as fast as possible.
         'can_fd' : True,
         'tx_data_min_length' : 8,
         'max_frame_size' : 6000
      }
      self.exit_requested = False
      #self.bus = VectorBus(channel='0', bitrate=500000)
      self.bus = VectorBus(channel='0', bitrate=500000, fd=True)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x10, target_address=0xF1) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         #time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
         time.sleep(0.001) # Variable sleep time based on state machine state

   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def recvsend( app, msg ):
   while True:
      if app.stack.available():
         payload = app.stack.recv()
         print("Recv msg : %s" % (payload.hex()))
         break
      time.sleep(0.0001)
   print("Send msg : %s" % (msg.hex()))
   app.send(msg)
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         break

if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')

   print("Exiting")
   app.shutdown()

Python code explanation for CAN-FD response side can-isotp

The parameter “max_frame_size” is added to isotp_params.

By default, the size that can be received is 4095 bytes.
The request side sends a 5000-byte message.
The response side needs to be larger than that.
In this article, it is set to receive up to 6000 bytes.

The response message itself is all SingleFrame.
The request side can see the difference between each frame, so the response side is simple enough.

Conclusion

  • I wrote the Python code for the request.
    • SingleFrame with 8 bytes or more.
    • MultiFrame of 4096 or more.
  • Wrote Python code for response.
    • Extended the size of the received frame by changing the max_frame_size parameter.

Click here for back issues.

コメント

タイトルとURLをコピーしました