【CAN-FD】車両診断通信 その95【ISO-TP②】

車両診断通信

バックナンバーはこちら。

スポンサーリンク

はじめに

can-isotpでCAN-FDのシミュレーションに向けて。
送受信するPythonコードを書く。

スポンサーリンク

登場人物

博識フクロウのフクさん

イラストACにて公開の「kino_k」さんのイラストを使用しています。
https://www.ac-illust.com/main/profile.php?id=iKciwKA9&area=1

エンジニア歴8年の太郎くん

イラストACにて公開の「しのみ」さんのイラストを使用しています。
https://www.ac-illust.com/main/profile.php?id=uCKphAW2&area=1

スポンサーリンク

CAN-FDリクエスト側can-isotpのPythonコード

フクさん
フクさん

早速コードを書いてしまおう。

太郎くん
太郎くん

おー!

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0, 
         'blocksize' : 0,
         'wftmax' : 0,
         'll_data_length' : 64,
         'tx_padding' : 0x55,
         'rx_flowcontrol_timeout' : 1000,
         'rx_consecutive_frame_timeout' : 1000,
         'squash_stmin_requirement' : False,
         'can_fd' : True,
         'tx_data_min_length' : 8
      }
      self.exit_requested = False
      #self.bus = VectorBus(channel='0', bitrate=500000)
      self.bus = VectorBus(channel='0', bitrate=500000, fd=True)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0xF1, target_address=0x10) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         #time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
         time.sleep(0.001) # Variable sleep time based on state machine state

   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def sendrecv( app, msg ):
   print("Send msg : %s" % (msg.hex()))
   app.send(msg)
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         payload = app.stack.recv()
         print("Recv msg : %s" % (payload.hex()))
         break
      time.sleep(0.2)


if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07')
   
   # 60byteメッセージ
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10 … \x09\x10')
   
   # 120byteメッセージ
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10 … \x09\x10')
   
   # 5000byteメッセージ
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10 … \x09\x10')

   print("Exiting")
   app.shutdown()   
スポンサーリンク

CAN-FDリクエスト側can-isotpのPythonコード解説

フクさん
フクさん

とりあえずDLCを64byte。CAN-FDを有効にした感じ。

フクさん
フクさん

そして、それをもって、

  • 7byte以下のSingleFrame
  • 8byte以上のSingleFrame
  • 4095byte以下のマルチフレーム
  • 4096byte以上のマルチフレーム

を見る感じだ。

太郎くん
太郎くん

なるほど。
CAN-FDになってからの変化点に着目するわけだ。

スポンサーリンク

CAN-FDレスポンス側can-isotpのPythonコード

フクさん
フクさん

対して、レスポンス側はこうなる。

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0,                          # Will request the sender to wait 32ms between consecutive frame. 0-127ms or 100-900ns with values from 0xF1-0xF9
         'blocksize' : 0,                       # Request the sender to send 8 consecutives frames before sending a new flow control message
         'wftmax' : 0,                          # Number of wait frame allowed before triggering an error
         #'ll_data_length' : 8,                  # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
         'll_data_length' : 16,                  # Link layer (CAN layer) works with 8 byte payload (CAN 2.0)
         'tx_padding' : 0xCC,                      # Will pad all transmitted CAN messages with byte 0x00. None means no padding
         'rx_flowcontrol_timeout' : 1000,        # Triggers a timeout if a flow control is awaited for more than 1000 milliseconds
         'rx_consecutive_frame_timeout' : 1000, # Triggers a timeout if a consecutive frame is awaited for more than 1000 milliseconds
         'squash_stmin_requirement' : False,     # When sending, respect the stmin requirement of the receiver. If set to True, go as fast as possible.
         'can_fd' : True,
         'tx_data_min_length' : 8,
         'max_frame_size' : 6000
      }
      self.exit_requested = False
      #self.bus = VectorBus(channel='0', bitrate=500000)
      self.bus = VectorBus(channel='0', bitrate=500000, fd=True)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x10, target_address=0xF1) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         #time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
         time.sleep(0.001) # Variable sleep time based on state machine state

   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def recvsend( app, msg ):
   while True:
      if app.stack.available():
         payload = app.stack.recv()
         print("Recv msg : %s" % (payload.hex()))
         break
      time.sleep(0.0001)
   print("Send msg : %s" % (msg.hex()))
   app.send(msg)
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         break

if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')

   print("Exiting")
   app.shutdown()
スポンサーリンク

CAN-FDレスポンス側can-isotpのPythonコード解説

太郎くん
太郎くん

あれ?
isotp_paramsにパラメータが増えた?
max_frame_sizeってのがあるけど?

フクさん
フクさん

おー、良く気が付いたね。
defaultだと受信可能サイズって4095byteなんだよね。
リクエスト側から5000byteのメッセージを送るんで、
レスポンス側は受信可能サイズをそれ以上にする必要がある。
今回の場合は6000byteまでは受信可能って設定になってる。

太郎くん
太郎くん

レスポンスメッセージ自体は全部SingleFrameになる感じだねー。

フクさん
フクさん

まぁリクエスト側で各フレームの違いは見れるんで、
レスポンス側はシンプルでいいかなーって思って。

太郎くん
太郎くん

そうだね。
レスポンス側のメッセージはこれでいいのかも。

スポンサーリンク

まとめ

フクさん
フクさん

まとめだよ。

  • リクエスト用のPythonコード書いた。
    • 8byte以上のSingleFrame。
    • 4096以上のMultiFrame。
  • レスポンス用のPythonコード書いた。
    • max_frame_sizeパラメータを弄って受信できるサイズを拡張した。

バックナンバーはこちら。

コメント

タイトルとURLをコピーしました