[Dcm] Vehicle Diagnostic Communication Part 80 [Simulation 19]

[Dcm] Vehicle Diagnostic Communication Part 80 [Simulation 19] 車両診断通信
[Dcm] Vehicle Diagnostic Communication Part 80 [Simulation 19]

Click here for back issues.
https://www.simulationroom999.com/blog/diagnostic-communication-en-back-issue/

Introduction.

AUTOSAR-Dcm simulation explanation.
In this article, we will explain the Python code for the ReadDataByIdentifier simulation.

Python code for simulating ReadDataByIdentifier

In this article, the ReadDataByIdentifier service will be explained.
The Python code for testing is shown as follows.

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0, 
         'blocksize' : 4,
         'wftmax' : 0,
         'll_data_length' : 8,
         'tx_padding' : 0xCC,
         'rx_flowcontrol_timeout' : 1000,
         'rx_consecutive_frame_timeout' : 1000,
         'squash_stmin_requirement' : False,
         'can_fd' : False,
         'tx_data_min_length' : 8
      }
      self.exit_requested = False
      #self.bus = VectorBus(channel='0', bitrate=500000)
      self.bus = VectorBus(channel='0', bitrate=500000, fd=True)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0xF1, target_address=0x10) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         #time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
         time.sleep(0.001) # Variable sleep time based on state machine state

   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def sendrecv( app, msg ):
   
   if msg[0] == 0x00:
      tsleep = msg[1]*0x100 + msg[2]
      tsleep = tsleep / 1000
      print("sleep : %f [ms]" % tsleep)
      time.sleep(tsleep)
   else:
      print("Send msg : %s" % (msg.hex()))
      app.send(msg)
      t1 = time.time()
      while time.time() - t1 < 1:
         if app.stack.available():
            payload = app.stack.recv()
            print("Recv msg : %s" % (payload.hex()))
            break
         time.sleep(0.001)


if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   datas=[
      bytes([0x22, 0x12, 0x34]),
      bytes([0x22, 0x56, 0x78]),
      bytes([0x22, 0x11, 0x11]),
      bytes([0x22, 0x12, 0x34, 0x56, 0x78]),
      bytes([0x22, 0x56, 0x78, 0x12, 0x34]),
      bytes([0x22, 0x12, 0x34, 0x11, 0x11, 0x56, 0x78]),
      bytes([0x22, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34]),
      bytes([0x22, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34, 0x12, 0x34]),
   ]
   
   #while True:
   for i in range(len(datas)):
      sendrecv(app, datas[i])

   print("Exiting")
   app.shutdown()

Python code description for simulating ReadDataByIdentifier.

The first one is a test for single DIDs, multi-DIDs, or mixing unsupported DIDs among multi-DIDs.

And this time, it also includes a very long message.
This is to check the following when requesting with a multi-DID.
“What happens if the response message length exceeds the maximum value of 4095 [bytes]?”

DID=0x1234 is 130 bytes per message.
The first byte of the message is fixed at 0x62 in the ReadDataByIdentifier Response SID.
How many DIDs can be responded with the remaining (4095-1)=4094[bytes]…?

\(\displaystyle\frac{4095[byte]-1[byte]}{130[byte/DID]}=31.49230769230769[DID]\)

In other words, it can respond up to 31 DIDs, but at 32 DIDs, the response message exceeds the maximum value.
This is to see the difference in behavior here.

Conclusion.

  • I wrote Python code for simulation of ReadDataByIdentifier.
  • Test pattern main related to ReadDataByIdentifier multi-DID.
    • Multi-DID.
    • Mix unsupported DIDs.
    • Specify DIDs such that the response message length exceeds the maximum value.

Click here for back issues.

コメント

タイトルとURLをコピーしました