[CanTp] Vehicle Diagnostic Communication Part 24 [Simulation 12]

[CanTp] Vehicle Diagnostic Communication Part 24 [Simulation 12] 車両診断通信
[CanTp] Vehicle Diagnostic Communication Part 24 [Simulation 12]

Click here for back issues.
https://www.simulationroom999.com/blog/diagnostic-communication-en-back-issue/

Introduction.

Let’s simulate ISO-TP. series.
In this article, we will try to create a pseudo-ECU by the Python package can-isotp.

Is the previous communication method incomplete?

Last time, I made a multi-frame request with can-isotp.
It was realized by FC response from the destination.

However, this is incomplete as communication.
As a communication as ISO15765-2, this is OK, but it is incomplete when assumed up to ISO14229-1.

Vehicle diagnostic communication is generally one communication with a request and a response.
So, the previous communication is incomplete because it is only a request.

It is difficult to reproduce the response message in python-can.
Therefore, I will use can-isotp for the response as well.
Well, I don’t know if it is really possible or not until I try it.

request-response with can-isotp

First, let’s draw an experimental configuration.

can-isotp experiment overall structure,diagnostic request,diagnostic response,python,iso-tp,can.logger,python-can,Virtual CAN Bus

Both will be configured to use can-isotp to perform request-response.

The tricky points this time are as follows.

  • Two scripts, one for the request side and one for the response side, are required.
  • Both need a send function and a receive function.

can-isotp request side script

The request side script is as follows.

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0, 
         'blocksize' : 0,
         'wftmax' : 0,
         'll_data_length' : 8,
         'tx_padding' : 0xCC,
         'rx_flowcontrol_timeout' : 1000,
         'rx_consecutive_frame_timeout' : 1000,
         'squash_stmin_requirement' : False,
         'can_fd' : False,
         'tx_data_min_length' : 8
      }
      self.exit_requested = False
      self.bus = VectorBus(channel='0', bitrate=500000)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0xF1, target_address=0x10) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         time.sleep(0.001) 
   
   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def sendrecv( app, msg ):
   print("Send msg : %s" % (msg.hex()))
   app.send(msg)
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         payload = app.stack.recv()
         print("Recv msg : %s" % (payload.hex()))
         break
      time.sleep(0.2)


if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   #sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07')
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10')

   print("Exiting")
   app.shutdown()

It uses a thread for receiving.
When you pass a message to the sendrecv function, you can assume that it handles sending and receiving in accordance with ISO 15765-2.

can-isotp response side script

The approximate structure is the same as the request side script, and the response side script is shown below.

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0,
         'blocksize' : 0,
         'wftmax' : 0,
         'll_data_length' : 8,
         'tx_padding' : 0xCC, 
         'rx_flowcontrol_timeout' : 1000,
         'rx_consecutive_frame_timeout' : 1000,
         'squash_stmin_requirement' : False,
         'can_fd' : True,
         'tx_data_min_length' : 8
      }
      self.exit_requested = False
      self.bus = VectorBus(channel='0', bitrate=500000)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x10, target_address=0xF1) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         #time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
         time.sleep(0.001) # Variable sleep time based on state machine state

   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def recvsend( app, msg ):
   while True:
      if app.stack.available():
         payload = app.stack.recv()
         print("Recv msg : %s" % (payload.hex()))
         break
      time.sleep(0.0001)
   print("Send msg : %s" % (msg.hex()))
   app.send(msg)
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         break

if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   #recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10')

   print("Exiting")
   app.shutdown()

The creation of the receiving thread is the same.
The only difference is that the previous sendrecv function has become recvsend, a function that receives and sends.
When a message is set in this function, it triggers a response when a request is received.

In other words, most of the complication exists in the implementation of the receiving thread.
Another cause is the use of class functions.
Although using class functions is more readable, it may not be the best way to explain the behavior.

We will check the actual behavior in the next issue.

Conclusion

  • Most vehicle diagnostic communication is a single request-response communication.
    • There are some exceptions such as request-only and response-only communication, but they are quite rare.
  • Both the request and response sides are script fatted by the receive thread mechanism.

Click here for back issues.

コメント

タイトルとURLをコピーしました