A python-can integration for the CANsub CAN bus interface family by CSS Electronics. Source on GitHub.
This package registers the CANsub as a standard python-can interface, making it compatible with all python-can tools and workflows. It also adds a CSV logger compatible with the webCAN browser tool provided with the device.
Tip: This README is optimized for LLMs. When using an AI coding assistant with this package, provide this file as context for accurate results.
The python-can-cansub package and the CANsub device communicate over a versioned API. They are compatible when the package supports the API version used by the device firmware.
- Each python-can-cansub release supports one API version. The supported API version for each release is listed in the python-can-cansub changelog.
- Each CANsub firmware release uses one API version. The API version for each firmware release is listed in the CANsub changelog (provided with the device).
To check compatibility, look up the API version of the package release and of the device firmware release in their respective changelogs. If they match, the two are compatible.
If the package finds a connected device whose API version it does not support, it emits a warning. Update the package or the device firmware so their API versions align.
pip install python-can-cansubWhen python-can-cansub is installed, the cansub interface is automatically registered with python-can. Import with:
import canPython-can defines a hardware configuration by an interface and a channel (a single interface can have multiple channels).
The CANsub interface is fixed "cansub". The channel is constructed from the device hostname (unique) and channel index.
| Connection | Hostname | python-can channel string |
|---|---|---|
| USB | [DEVICE-ID]-usb.local |
[DEVICE-ID]-usb.local@[channel] |
| Ethernet | [DEVICE-ID]-eth.local |
[DEVICE-ID]-eth.local@[channel] |
The device-ID is printed on the device label. Channel indexing is 1-based - the first channel is 1.
A configuration is passed to can.Bus to open a bus.
Example of a fixed configuration:
configs = [{"interface": "cansub", "channel": "aabbccdd-usb.local@1"},
{"interface": "cansub", "channel": "aabbccdd-usb.local@2"}]Example of using detect_available_configs to automatically discover (uses mDNS) all connected CANsub devices and channels:
configs = can.detect_available_configs(interfaces=["cansub"])
# e.g. [{"interface": "cansub", "channel": "aabbccdd-usb.local@1"},
# {"interface": "cansub", "channel": "aabbccdd-usb.local@2"},
# {"interface": "cansub", "channel": "11223344-eth.local@1"},
# {"interface": "cansub", "channel": "11223344-eth.local@2"}]In the above example two CANsub devices are detected, each with two channels. One device is connected via USB and the other via Ethernet.
Note:
data_bitrateis required even on a classic (non-FD) bus. On a non-FD bus, simply set it to e.g.1_000_000(1 Mbit/s).
Tip: Pass
listen_only=Truetocan.Busto monitor a bus without transmitting or acknowledging frames.
Tip: Pass
error_frames=Truetocan.Busto receive error frames.
with can.Bus(interface="cansub", channel="aabbccdd-usb.local@1", bitrate=250_000, data_bitrate=1_000_000) as bus:
passwith can.Bus(interface=configs[0]["interface"], channel=configs[0]["channel"], bitrate=250_000, data_bitrate=1_000_000) as bus:
passwith (can.Bus(interface=configs[0]["interface"], channel=configs[0]["channel"], bitrate=250_000, data_bitrate=1_000_000) as bus1,
can.Bus(interface=configs[1]["interface"], channel=configs[1]["channel"], bitrate=250_000, data_bitrate=1_000_000) as bus2):
passTip:
**configunpacks a config dict directly intocan.Buskeyword arguments:with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus: pass
The data connection to the device is secured by TLS. When connecting via an IP address (where hostname verification will fail), server_cert=None can be used to disable certificate validation:
with can.Bus(interface="cansub", channel="192.168.1.10@1", server_cert=None, bitrate=250_000, data_bitrate=1_000_000) as bus:
passIf TLS mutual authentication is enabled, client_cert can be used to provide a tuple of paths to the client certificate (.crt file) and its unencrypted private key (.key file), i.e. ("cert", "key"):
with can.Bus(**configs[0], client_cert=("/path/to/client.crt", "/path/to/client.key"), bitrate=250_000, data_bitrate=1_000_000) as bus:
passwith can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
# Transmit
msg_tx = can.Message(is_extended_id=False, arbitration_id=0x123, data=[0x01, 0x02, 0x03, 0x04])
bus.send(msg_tx)
# Receive with timeout
msg_rx = bus.recv(timeout=1.0)
print(msg_rx)Error frame reporting is disabled by default; enable it by passing error_frames=True to can.Bus. Bus errors are then received as a can.Message with is_error_frame set. The error type is encoded in arbitration_id, which can be converted to a CanSubErrorFrameType enum:
from python_can_cansub import CanSubErrorFrameType
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000, error_frames=True) as bus:
msg = bus.recv(timeout=1.0)
if msg and msg.is_error_frame:
error_type = CanSubErrorFrameType(msg.arbitration_id)
print(f"Bus error: {error_type.name}") # e.g. "Bus error: ACK"Apply hardware filters by passing can_filters to can.Bus. Each filter specifies a can_id, a can_mask, and whether to match standard (extended=False) or extended (extended=True) frames. A frame passes if (frame_id & can_mask) == (can_id & can_mask).
filters = [
{"can_id": 0x123, "can_mask": 0x7FF, "extended": False}, # standard frames, exact ID match
{"can_id": 0x000, "can_mask": 0x000, "extended": True}, # all extended frames
]
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000, can_filters=filters) as bus:
msg = bus.recv(timeout=1.0)
print(msg)Tip: Applying hardware filters reduces the network load between the CANsub and the connected client.
bus.recv() blocks until a frame arrives. A can.Notifier runs a background thread that dispatches received frames to one or more listeners, allowing the main program to continue other work.
python-can provides built-in listeners including can.Printer (print to stdout) and can.Logger (log to file). The example below prints to stdout and logs to a CSV file while the main program continues. Custom listeners can be implemented by subclassing can.Listener.
from time import sleep
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
with can.Notifier([bus], listeners=[can.Printer(), can.Logger("log.csv")]):
# Perform other tasks here while frames are received in the background
sleep(10)Periodic transmission jobs can be started with bus.send_periodic().
Most periodic transmission job types can be offloaded to the CANsub hardware, providing much better transmission time accuracy (compared to a host-scheduled transmission). A host-side background task is used only as a fallback when hardware transmission is not available.
from time import sleep
msgs = [
can.Message(is_extended_id=False, arbitration_id=0x123, data=[0x01, 0x02, 0x03, 0x04]),
can.Message(is_extended_id=False, arbitration_id=0x124, data=[0x05, 0x06, 0x07, 0x08]),
can.Message(is_extended_id=False, arbitration_id=0x125, data=[0x09, 0x0A, 0x0B, 0x0C]),
]
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
# period: time between individual frames (sequence repeats every len(msgs) * period)
# duration: total transmission time in seconds (None = transmit indefinitely)
task = bus.send_periodic(msgs, period=0.1, duration=5.0)
# Perform other tasks here while frames are transmitted in the background
sleep(6)can.MessageSync can be used to replay messages from a log file.
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
with can.LogReader("log.csv") as reader:
for msg in can.MessageSync(messages=reader):
bus.send(msg)On import, this package overrides the default python-can .csv reader and writer with a format compatible with the webCAN browser tool provided with the device. This applies automatically wherever .csv files are read or written, including can.Logger, can.LogReader, and the command-line tools.
The writer (CanSubCSVWriter) and reader (CanSubCSVReader) can also be used directly:
from python_can_cansub import CanSubCSVWriter, CanSubCSVReader
# Write received messages to a webCAN-compatible CSV file
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
with CanSubCSVWriter("log.csv") as writer:
msg = bus.recv(timeout=1.0)
if msg:
writer.on_message_received(msg)
# Read messages back from the CSV file
with CanSubCSVReader("log.csv") as reader:
for msg in reader:
print(msg)python-can includes several command line tools. All tools accept --interface and --channel to select the bus, following the same configuration as the API.
The common argument pattern for the CANsub:
--interface cansub --channel aabbccdd-usb.local@1 --bitrate 250000 --data-bitrate 1000000
Note that the filter argument supported by some command-line tools is limited to standard (11-bit) CAN IDs. Filtering on extended (29-bit) IDs requires the python-can API.
Log received frames to a file (format inferred from file extension):
can_logger --interface cansub --channel aabbccdd-usb.local@1 --bitrate 250000 --data-bitrate 1000000 --file_name log.csvPlay back a previously recorded log file:
can_player --interface cansub --channel aabbccdd-usb.local@1 --bitrate 250000 --data-bitrate 1000000 log.csvLive terminal viewer showing received frames, updated counts, timestamps, and byte-level changes:
can_viewer --interface cansub --channel aabbccdd-usb.local@1 --bitrate 250000 --data-bitrate 1000000On Windows, the can_viewer requires windows-curses (pip install windows-curses).
Forward all frames received on one bus to another (e.g. bridge two CANsub channels):
can_bridge --bus1-interface cansub --bus1-channel aabbccdd-usb.local@1 --bus1-bitrate 250000 --bus1-data-bitrate 1000000 \
--bus2-interface cansub --bus2-channel aabbccdd-usb.local@2 --bus2-bitrate 250000 --bus2-data-bitrate 1000000Convert a log file between formats; the format is inferred from the file extension:
can_logconvert log.csv log.ascThe following packages complement python-can-cansub and are included here as inspiration for working with CAN data in Python.
cantools is a Python package for encoding and decoding CAN messages. Encoding/decoding rules can be created or loaded from DBC (and other) database files. It works directly with can.Message objects from python-can.
pip install cantoolsA database can be constructed directly in Python without a database file:
import cantools
db = cantools.database.Database()
msg_def = cantools.database.can.Message(
frame_id=0x123,
name="Message1",
length=8,
signals=[
cantools.database.can.Signal(name="Signal1", start=0, length=16, scale=0.1, offset=0.0, minimum=0.0, maximum=100.0),
cantools.database.can.Signal(name="Signal2", start=16, length=16, scale=0.1, offset=0.0, minimum=0.0, maximum=100.0),
]
)
db.add_message(msg_def)import cantools
db = cantools.database.load_file("database.dbc")
msg_def = db.get_message_by_name("Message1")Encode signal values into the byte payload of a can.Message:
data = msg_def.encode({"Signal1": 1.0, "Signal2": 42.5})
msg_tx = can.Message(arbitration_id=msg_def.frame_id,
is_extended_id=msg_def.is_extended_frame,
data=data)
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
bus.send(msg_tx)Decode the byte payload of a received can.Message back into signal values:
with can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
msg_rx = bus.recv(timeout=1.0)
if msg_rx:
signals = db.decode_message(msg_rx.arbitration_id, msg_rx.data)
print(signals) # e.g. {'Signal1': 1.0, 'Signal2': 42.5}asammdf is a Python package for reading and writing MDF (Measurement Data Format) files.
When asammdf is installed, python-can automatically gains support for reading MDF log files via can.LogReader, allowing MDF recordings to be played back directly using can.MessageSync:
pip install asammdfwith can.Bus(**configs[0], bitrate=250_000, data_bitrate=1_000_000) as bus:
with can.LogReader("recording.mf4") as reader:
for msg in can.MessageSync(messages=reader):
bus.send(msg)