【CanTp】車両診断通信 その25【シミュレーション⑫】

車両診断通信

バックナンバーはこちら。

スポンサーリンク

はじめに

ISO-TPのシミュレーションをしよう。のシリーズ。
Pythonパッケージcan-isotpによる疑似ECU。

スポンサーリンク

登場人物

博識フクロウのフクさん

イラストACにて公開の「kino_k」さんのイラストを使用しています。
https://www.ac-illust.com/main/profile.php?id=iKciwKA9&area=1

エンジニア歴8年の太郎くん

イラストACにて公開の「しのみ」さんのイラストを使用しています。
https://www.ac-illust.com/main/profile.php?id=uCKphAW2&area=1

スポンサーリンク

前回の通信だと不完全?

フクさん
フクさん

前回、can-isotpでマルチフレームリクエストをしたんだけど。

太郎くん
太郎くん

うん。FCを送ってなんとか動いたね。

フクさん
フクさん

ISO15765-2としての通信としては、これでOKなんだけど、
ISO14229-1まで考えると不完全なんだよね。

太郎くん
太郎くん

どういうこと?

フクさん
フクさん

車両診断通信は一般的に
リクエストとレスポンスで1つの通信になるんだよ。
というわけで、前回の通信はリクエストだけなんで不完全ってことになる。

太郎くん
太郎くん

って、ことは今回はpytohn-canでレスポンスメッセージを再現するってこと?
結構大変じゃない??

フクさん
フクさん

今回はcan-isotpでレスポンスする。

太郎くん
太郎くん

え?そんなことできるの?

フクさん
フクさん

たぶん?

太郎くん
太郎くん

(まじか)

スポンサーリンク

can-isotpでリクエスト-レスポンス

フクさん
フクさん

まずは構成を書くね。

太郎くん
太郎くん

恒例の図解だね。

太郎くん
太郎くん

まぁ両方ともcan-isotp使ってリクエストレスポンスしますよって構成だね。

フクさん
フクさん

今回のメンドーなポイントとしては以下。

  • リクエスト側、レスポンス側の2つのスクリプトが必要。
  • 共に送信機能と受信機能が必要。
太郎くん
太郎くん

メンドクサソウってことだけはわかった。

スポンサーリンク

can-isotpリクエスト側のスクリプト

フクさん
フクさん

とりあえず、リクエスト側スクリプトはこれ

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0, 
         'blocksize' : 0,
         'wftmax' : 0,
         'll_data_length' : 8,
         'tx_padding' : 0xCC,
         'rx_flowcontrol_timeout' : 1000,
         'rx_consecutive_frame_timeout' : 1000,
         'squash_stmin_requirement' : False,
         'can_fd' : False,
         'tx_data_min_length' : 8
      }
      self.exit_requested = False
      self.bus = VectorBus(channel='0', bitrate=500000)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0xF1, target_address=0x10) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         time.sleep(0.001) 
   
   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def sendrecv( app, msg ):
   print("Send msg : %s" % (msg.hex()))
   app.send(msg)
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         payload = app.stack.recv()
         print("Recv msg : %s" % (payload.hex()))
         break
      time.sleep(0.2)


if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   #sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07')
   sendrecv(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10')

   print("Exiting")
   app.shutdown()
太郎くん
太郎くん

これはまた結構ややこしくなったねー。

フクさん
フクさん

受信用にスレッドを使ってるからねー。
とりあえずはsendrecv関数に電文を渡すと、ISO15765-2に則って送受信すると思って良いよ。

スポンサーリンク

can-isotpレスポンス側のスクリプト

フクさん
フクさん

おおよそ構成は一緒だけどレスポンス側のスクリプトがこれ。

import isotp
import logging
import time
import threading

from can.interfaces.vector import VectorBus

class ThreadedApp:
   def __init__(self):
      isotp_params = {
         'stmin' : 0,
         'blocksize' : 0,
         'wftmax' : 0,
         'll_data_length' : 8,
         'tx_padding' : 0xCC, 
         'rx_flowcontrol_timeout' : 1000,
         'rx_consecutive_frame_timeout' : 1000,
         'squash_stmin_requirement' : False,
         'can_fd' : True,
         'tx_data_min_length' : 8
      }
      self.exit_requested = False
      self.bus = VectorBus(channel='0', bitrate=500000)
      addr = isotp.Address(isotp.AddressingMode.NormalFixed_29bits, source_address=0x10, target_address=0xF1) 
      self.stack = isotp.CanStack(self.bus, address=addr, params=isotp_params, error_handler=self.my_error_handler)

   def start(self):
      self.exit_requested = False
      self.thread = threading.Thread(target = self.thread_task)
      self.thread.start()

   def stop(self):
      self.exit_requested = True
      if self.thread.isAlive():
         self.thread.join()
   
   def send(self, msg):
      self.stack.send(msg)
   
   def my_error_handler(self, error):
      logging.warning('IsoTp error happened : %s - %s' % (error.__class__.__name__, str(error)))

   def thread_task(self):
      while self.exit_requested == False:
         self.stack.process()                # Non-blocking
         #time.sleep(self.stack.sleep_time()) # Variable sleep time based on state machine state
         time.sleep(0.001) # Variable sleep time based on state machine state

   def shutdown(self):
      self.stop()
      self.bus.shutdown()

def recvsend( app, msg ):
   while True:
      if app.stack.available():
         payload = app.stack.recv()
         print("Recv msg : %s" % (payload.hex()))
         break
      time.sleep(0.0001)
   print("Send msg : %s" % (msg.hex()))
   app.send(msg)
   t1 = time.time()
   while time.time() - t1 < 5:
      if app.stack.available():
         break

if __name__ == '__main__':
   app = ThreadedApp()
   app.start()
   
   #recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07')
   recvsend(app, b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10\x01\x02\x03\x04\x05\x06\x07\x08\x09\x10')

   print("Exiting")
   app.shutdown()
フクさん
フクさん

受信スレッドの作りは一緒。
違いは先のsendrecv関数がrecvsendになって、受送信をする関数になったってくらい。
この関数に電文をセットすると、リクエストを受信したことをトリガーにレスポンスを返す振る舞いになる。

太郎くん
太郎くん

なるほど。
ややこしさの大半は受信スレッドを実現するためのものか。

フクさん
フクさん

それもあるけど、クラス化したってのもあるね。
本来クラス化した方が可読性があがるんだけど、振る舞いの説明としてはちょっと不向きかもね。

フクさん
フクさん

実際の動作は次回かな。

スポンサーリンク

まとめ

フクさん
フクさん

まとめだよ。

  • 車両診断通信の大半はリクエスト-レスポンスで一つの通信。
    • 例外的にリクエストのみ、レスポンスのみもあるが、かなりレアなパターン。
  • リクエスト側、レスポンス側ともに受信スレッドの仕組みでスクリプト肥大している。

バックナンバーはこちら。

コメント

タイトルとURLをコピーしました