Farhan Kabir
Farhan Kabir

Reputation: 169

How to send and receive CAN messages between 2 Linux machines without any CAN hardware?

I have been trying to establish CAN communication between my laptop (Ubuntu 20 Virtualbox) and Raspberry Pi (Ubuntu 20) without any CAN hardware, because that will not get the CAN message in the simulation environment. I want to send the CAN data as payload through wifi or USB. Then my python simulation environment should be able to interpret these as CAN messages and forward appropriately.

I have tried vcan from socketcan but it works only between 2 terminals of the same Linux machine. I have been advised to look at slcan. It seems like there is no other option other than using actual CAN hardware. I can't find any tutorial or any other help anywhere.

I will humbly appreciate if anyone can suggest how to send and receive CAN messages between 2 Linux machines without any CAN hardware through wifi or USB?

Python source code I tried:

import sys
import socket
import argparse
import struct
import errno


class CANSocket(object):
  FORMAT = "<IB3x8s"
  FD_FORMAT = "<IB3x64s"
  CAN_RAW_FD_FRAMES = 5

  def __init__(self, interface=None):
    self.sock = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)
    if interface is not None:
      self.bind(interface)

  def bind(self, interface):
    self.sock.bind((interface,))
    self.sock.setsockopt(socket.SOL_CAN_RAW, self.CAN_RAW_FD_FRAMES, 1)

  def send(self, cob_id, data, flags=0):
    cob_id = cob_id | flags
    can_pkt = struct.pack(self.FORMAT, cob_id, len(data), data)
    self.sock.send(can_pkt)

  def recv(self, flags=0):
    can_pkt = self.sock.recv(72)

    if len(can_pkt) == 16:
      cob_id, length, data = struct.unpack(self.FORMAT, can_pkt)
    else:
      cob_id, length, data = struct.unpack(self.FD_FORMAT, can_pkt)

    cob_id &= socket.CAN_EFF_MASK
    return (cob_id, data[:length])


def format_data(data):
    return ''.join([hex(byte)[2:] for byte in data])


def generate_bytes(hex_string):
    if len(hex_string) % 2 != 0:
      hex_string = "0" + hex_string

    int_array = []
    for i in range(0, len(hex_string), 2):
        int_array.append(int(hex_string[i:i+2], 16))

    return bytes(int_array)


def send_cmd(args):
    try:
      s = CANSocket(args.interface)
    except OSError as e:
      sys.stderr.write('Could not send on interface {0}\n'.format(args.interface))
      sys.exit(e.errno)

    try:
      cob_id = int(args.cob_id, 16)
    except ValueError:
      sys.stderr.write('Invalid cob-id {0}\n'.format(args.cob_id))
      sys.exit(errno.EINVAL)

    s.send(cob_id, generate_bytes(args.body), socket.CAN_EFF_FLAG if args.extended_id else 0)


def listen_cmd(args):
    try:
      s = CANSocket(args.interface)
    except OSError as e:
      sys.stderr.write('Could not listen on interface {0}\n'.format(args.interface))
      sys.exit(e.errno)

    print('Listening on {0}'.format(args.interface))

    while True:
        cob_id, data = s.recv()
        print('%s %03x#%s' % (args.interface, cob_id, format_data(data)))


def parse_args():
    parser = argparse.ArgumentParser()
    subparsers = parser.add_subparsers()

    send_parser = subparsers.add_parser('send', help='send a CAN packet')
    send_parser.add_argument('interface', type=str, help='interface name (e.g. vcan0)')
    send_parser.add_argument('cob_id', type=str, help='hexadecimal COB-ID (e.g. 10a)')
    send_parser.add_argument('body', type=str, nargs='?', default='',
      help='hexadecimal msg body up to 8 bytes long (e.g. 00af0142fe)')
    send_parser.add_argument('-e', '--extended-id', action='store_true', default=False,
      help='use extended (29 bit) COB-ID')
    send_parser.set_defaults(func=send_cmd)

    listen_parser = subparsers.add_parser('listen', help='listen for and print CAN packets')
    listen_parser.add_argument('interface', type=str, help='interface name (e.g. vcan0)')
    listen_parser.set_defaults(func=listen_cmd)

    return parser.parse_args()


def main():
    args = parse_args()
    args.func(args)


if __name__ == '__main__':
    main()

Upvotes: 2

Views: 1929

Answers (1)

secret squirrel
secret squirrel

Reputation: 807

CAN is a broadcast protocol and not a connection protocol like TCP or datagram protocol like UDP. However, ff you want abstract the hardware but still simulate the higher layer behaviour of CAN, I'd suggest the best way to do would be with UDP (but TCP as suggested by @M. Spiller also works, just means handling accepting connections which you don't have to bother with for UDP), and send and receive packets with a byte structure the same as a CAN frame. You would also need to account for filters and error frames.

If you're going to use socketcan on linux, your UDP packets should look like:

struct {
    uint16_t id;       /* CAN ID of the frame */
    uint8_t dlc;       /* frame payload length in bytes */
    uint8_t data[8];   /* CAN frame payload */
} simulated_udp_can_frame;

You should read the socketCAN kernel documentation for more detail.

Upvotes: 1

Related Questions