#!/usr/bin/env python3 """ pyJeepCan -- live dashboard for JEEP CAN bus traffic. Originally created: 11.2023 Last updated: 05.2026 Reads a curated set of message IDs from CAN-C and CAN-IHS, decodes each field with a purpose-built helper (gear lookup, PRNDL, transfer- case codes, oil-pressure kPa->PSI, etc.) and renders the result as a terminal dashboard via curses. USAGE ./pyJeepCan.py [--ihs IFACE] [--c IFACE] [--experimental] [--log-decode FILE] [--help] OPTIONS --ihs IFACE CAN-IHS interface (default: vcan0) --c IFACE CAN-C interface (default: vcan1) --experimental Also enable the UNVERIFIED candidate IDs from the Bus & Message Reference candidate-IDs table (fuel %, ambient temp, throttle, cruise, 4-tire TPMS, odometer). Expect false readings on any platform but the one the legacy script was tuned for; verify with candump first. --log-decode FILE Append " -> " per frame to FILE for offline analysis. Useful for capturing a session for later diff/replay without polluting the TUI. -h, --help Print this message and exit. KEYS q Quit the dashboard. Ctrl+C also works. REQUIRES - python-can: pip install python-can - can-utils (vcan): apt install can-utils - Two SocketCAN interfaces up. For a bench rig replaying a captured log without a vehicle present: ip link add dev vcan0 type vcan && ip link set vcan0 up ip link add dev vcan1 type vcan && ip link set vcan1 up canplayer vcan0=can0 vcan1=can1 -i 4mi-loop.log ./pyJeepCan.py # in another terminal REFERENCE https://magikh0e.pl/pubCarHacking/bus-message-reference.html#live-data-map -- the IDs / bus / byte-offset / decode table this script implements. https://magikh0e.pl/pubCarHacking/bus-message-reference.html#candidate-ids -- the UNVERIFIED candidate IDs that --experimental adds. PLATFORM NOTE All non-experimental IDs and byte offsets were verified on the JEEP platform the legacy script was developed against. FCA / Stellantis reassigns CAN IDs across model years and platforms; verify with candump on YOUR vehicle before relying on a reading. CAVEAT Read-only. No writes back to the bus. Safe to run on a moving vehicle from the standpoint of "this script alone can't break anything" -- but eyes-on-road, dashboard-on-co-pilot-laptop, obviously. REVISION NOTES (2026-05-16) - Shebang is now `#!/usr/bin/env python3` (was hardcoded to /usr/bin/python3) - python-can API fixes: * `bustype=` -> `interface=` (deprecated since python-can 4.0) * `filter=` -> `can_filters=` (the legacy kwarg was wrong; python-can silently ignored it, so kernel-side filtering never engaged and every frame on every interface fell through to Python-side matching) - Multi-channel architecture rewrite: one Bus per CAN interface, threaded reader for each, shared queue.Queue feeding the main display loop. Replaces the legacy "Bus('')" + per-filter `can_channel` dict that was never standard python-can. - Curses lifecycle managed via curses.wrapper() -- terminal restoration is guaranteed even on uncaught exception. - gear() / xfer() now return '?' for unmapped bytes (legacy returned None, which rendered as the literal "None" in the dashboard). - Bare `except:` replaced with specific exception types. - Decode helpers gained docstrings explaining each decode rule. - New CLI flags: * `--ihs IFACE`, `--c IFACE` interfaces no longer hardcoded * `--experimental` enables the BMR candidate-IDs table (fuel, ambient, throttle, cruise, 4-tire TPMS, odometer) * `--log-decode FILE` append-only frame log for offline analysis * `-h, --help` argparse usage block - Pre-flight check for both interfaces (with bitrate hints and a working `ip link add ... type vcan` command for bench rigs) - 'q' / 'Q' quits the dashboard in addition to Ctrl+C Originals preserved as pyJeepCan.legacy.py and pyJeepCan.legacy.py.txt. """ import argparse import curses import os import queue import sys import threading import time try: import can except ImportError: sys.stderr.write( "ERROR: python-can not installed. Install with:\n" " pip install python-can\n" ) sys.exit(2) # ===================================================================== # Decode helpers # ===================================================================== # Each helper takes the raw 8-byte payload `x` plus one or more byte # indices and returns a value ready for display. All offsets / constants # come from the per-message decode rules documented in BMR's live-data # map table; the helpers are platform-neutral, but the byte INDICES we # pass them in build_monitors() are platform-specific. def raw8(x, a): return x[a] def raw16(x, a, b): return (x[a] << 8) + x[b] def raw32(x, a, b, c, d): """32-bit big-endian read (odometer, fuel-trim totals, etc.).""" return (x[a] << 24) + (x[b] << 16) + (x[c] << 8) + x[d] def volt(x, a): """Battery V: stored as decivolts, divide by 10.""" return x[a] / 10 def temp(x, a): """Coolant / IAT / oil temp: byte holds (deg C + 40). Convert to F.""" return round((((x[a] - 40) * (9 / 5)) + 32)) def tilt(x, a, b): """Roll / Tilt / Yaw: 16-bit BE with 2048 zero-offset, scaled /10.""" return round(((x[a] << 8) + x[b] - 2048) / 10) def rpm(x, a, b): """Engine RPM: 16-bit BE. 0xFFFF = engine-off sentinel.""" if x[a] == 0xFF: return 0 return (x[a] << 8) + x[b] def mph(x, a, b): """Vehicle speed: 16-bit BE, /200 = MPH, one decimal.""" return round(((x[a] << 8) + x[b]) / 200, 1) def psi(x, a): """Oil pressure / PS PSI: byte * 4 kPa, then kPa->PSI conversion.""" return round((x[a] * 4) * 0.145038) def gear(x, a): """Gear selector (PRNDL). The byte happens to be ASCII for the gear label -- 0x50 = 'P', 0x52 = 'R', etc. We map back to that char.""" return { 0x50: 'P', 0x52: 'R', 0x4E: 'N', 0x44: 'D', 0x31: '1', 0x32: '2', 0x33: '3', 0x34: '4', 0x35: '5', 0x36: '6', 0x37: '7', 0x38: '8', }.get(x[a], '?') def xfer(x, a): """Transfer case position. Mapping is platform-specific.""" return { 0x00: '4x2H', 0x02: 'N', 0x10: '4x4H', 0x20: 'N', 0x40: '4x4L', 0x80: 'Shifting', }.get(x[a], '?') def steer(x, a, b): """Steering angle (and rate): 16-bit BE with 0x1000 zero-offset.""" return ((x[a] << 8) + x[b]) - 0x1000 def pstemp(x, a): """Power-steering temp: byte * 9/5 + 32 (no -40 offset; empirical on the test vehicle).""" return round(((x[a] * (9 / 5)) + 32)) # ---- Candidate helpers (used only with --experimental). All UNVERIFIED. # See BMR #candidate-ids for the rationale and confidence groupings. def fuel(x, a): """Fuel level percentage: byte / 2.55 -> 0..100.""" return round(x[a] / 2.55) def pct(x, a): """Generic 0..100% (throttle position, accelerator pedal, etc.).""" return round(x[a] / 2.55) def amb(x, a): """Outside ambient temp. Byte / 2 - 40 in Celsius, returned in F.""" return round((((x[a] / 2) - 40) * 9 / 5) + 32) def tpms_psi(x, a): """TPMS tire pressure: byte / 4 = PSI on the JL Wrangler convention.""" return round(x[a] / 4) def odo(x, a, b, c, d): """Odometer: 32-bit BE, units 0.1 mile -> miles.""" return round(raw32(x, a, b, c, d) / 10) def bit_flag(x, a, mask): """Boolean bit test. Returns 'Y'/'N' for terminal display.""" return 'Y' if (x[a] & mask) else 'N' def wrapper(msg_data, name, func, fmt, *args): """Apply a decode function to a CAN frame's data bytes. The `name` and `fmt` are carried through the monitor tuple but consumed by the caller, not by this function.""" return func(msg_data, *args) # ===================================================================== # Monitor list # ===================================================================== # Each entry is (arbitration_id, can_interface, [field, ...]). # Each field is (label, decode_func, output_type, *byte_indices). # build_monitors() takes the runtime interface names so the table # isn't bound to vcan0/vcan1 at import time. def build_monitors(ihs, c, include_experimental=False): monitors = [ (0x2C2, ihs, [("Batt V", volt, str, 2), ("Batt ?", raw8, hex, 0)]), (0x02B, c, [("Roll", tilt, str, 0, 1), ("Tilt", tilt, str, 2, 3), ("Yaw", tilt, str, 4, 5)]), (0x322, ihs, [("RPM", rpm, str, 0, 1), ("MPH", mph, str, 2, 3)]), (0x127, c, [("IAT", temp, str, 0), ("Coolant", temp, str, 1)]), (0x13D, c, [("Oil Temp", temp, str, 3), ("Oil Pres", psi, str, 2)]), (0x093, c, [("Gear", gear, str, 2)]), (0x277, c, [("Transfer", xfer, str, 0)]), (0x023, c, [("Steer Angl", steer, str, 0, 1), ("Rate", steer, str, 2, 3)]), (0x128, c, [("PS Temp", pstemp, str, 1), ("PS PSI", psi, str, 2)]), ] if include_experimental: # UNVERIFIED. Bytes vary by year/model. See BMR #candidate-ids. # Subset of the full candidate list -- the ones most useful to # have on a live dashboard. Expand here as you verify more. monitors.extend([ (0x308, ihs, [("Fuel %", fuel, str, 0)]), (0x29C, ihs, [("Outside F", amb, str, 0)]), (0x129, c, [("Throttle", pct, str, 2)]), (0x2D0, ihs, [("Cruise", raw8, str, 0)]), (0x371, ihs, [("TPMS FL", tpms_psi, str, 0), ("TPMS FR", tpms_psi, str, 1), ("TPMS RL", tpms_psi, str, 2), ("TPMS RR", tpms_psi, str, 3)]), (0x3E0, ihs, [("Odo mi", odo, str, 0, 1, 2, 3)]), ]) return monitors def filters_for_channel(monitors, channel): """Build a python-can filter list for one channel. Each filter accepts exactly one ID; the kernel does the matching, so the main loop only sees frames we care about. 0x7FF mask = 11-bit standard CAN IDs (the JEEP/FCA stack doesn't use extended IDs).""" return [ {"can_id": mid, "can_mask": 0x7FF, "extended": False} for (mid, ch, _fields) in monitors if ch == channel ] # ===================================================================== # CAN reader thread # ===================================================================== def reader_thread(channel, can_filters, msg_queue, stop_event): """Read messages from one CAN channel into the shared queue. Each enqueued item is (channel_name, can.Message). The main thread dispatches based on channel + arbitration_id so two IDs that happen to collide across buses still display in the right row.""" try: bus = can.interface.Bus( channel=channel, interface='socketcan', can_filters=can_filters, ) except Exception as e: # noqa: BLE001 -- surface anything to the TUI msg_queue.put(('__error__', f"{channel}: {e}")) return try: while not stop_event.is_set(): msg = bus.recv(timeout=0.5) if msg is not None: msg_queue.put((channel, msg)) finally: try: bus.shutdown() except Exception: # noqa: BLE001 pass # ===================================================================== # Curses display loop # ===================================================================== def draw_labels(stdscr, monitors): """Render the static field labels once at startup.""" stdscr.clear() for row, (_mid, _ch, fields) in enumerate(monitors): for col, field in enumerate(fields): label = field[0] stdscr.addstr(row, col * 30, label) stdscr.refresh() def run(stdscr, args, log_fp=None): curses.curs_set(0) stdscr.nodelay(True) monitors = build_monitors( args.ihs, args.c, include_experimental=args.experimental, ) draw_labels(stdscr, monitors) msg_queue = queue.Queue() stop_event = threading.Event() # One reader thread per channel that actually has IDs to watch. channels_in_use = sorted({ch for (_id, ch, _f) in monitors}) threads = [] for ch in channels_in_use: filt = filters_for_channel(monitors, ch) if not filt: continue t = threading.Thread( target=reader_thread, args=(ch, filt, msg_queue, stop_event), daemon=True, ) t.start() threads.append(t) status_row = len(monitors) + 1 try: while True: # Quit on 'q' / 'Q'. nodelay() returns -1 when no key is # pressed, so this poll is cheap. try: key = stdscr.getch() if key in (ord('q'), ord('Q')): break except curses.error: pass try: channel, msg = msg_queue.get(timeout=0.25) except queue.Empty: continue if channel == '__error__': stdscr.addstr(status_row, 0, f"ERR: {str(msg)[:79]:<79}") stdscr.refresh() continue for row, (mid, ch, fields) in enumerate(monitors): if msg.arbitration_id != mid or channel != ch: continue values = [] for col, field in enumerate(fields): try: val = wrapper(msg.data, *field) text = field[2](val) except (IndexError, KeyError, ValueError, TypeError): text = '?' values.append((col, text)) stdscr.addstr(row, (col * 30) + 15, f"{text:<5}") if log_fp is not None: log_fp.write( f"{time.time():.6f} {ch} {mid:03X} " f"{msg.data.hex().upper()} -> " f"{' '.join(f'{f[0]}={t}' for f,(_,t) in zip(fields, values))}\n" ) log_fp.flush() stdscr.refresh() finally: stop_event.set() for t in threads: t.join(timeout=1.0) # ===================================================================== # Main / CLI # ===================================================================== def _iface_exists(name): """Linux: an interface exists iff /sys/class/net// exists.""" return os.path.isdir(f'/sys/class/net/{name}') def main(): parser = argparse.ArgumentParser( description="Live JEEP CAN bus dashboard (CAN-IHS + CAN-C).", epilog="Press 'q' (or Ctrl+C) to quit.", ) parser.add_argument( '--ihs', default='vcan0', help='CAN-IHS interface (default: vcan0)', ) parser.add_argument( '--c', default='vcan1', help='CAN-C interface (default: vcan1)', ) parser.add_argument( '--experimental', action='store_true', help='Also enable the UNVERIFIED candidate IDs from the BMR ' 'candidate-IDs table', ) parser.add_argument( '--log-decode', metavar='FILE', default=None, help='Append each decoded frame to FILE for offline analysis', ) args = parser.parse_args() # Pre-flight outside curses so error messages are readable. for label, iface in [('--ihs', args.ihs), ('--c', args.c)]: if not _iface_exists(iface): bitrate = 125000 if label == '--ihs' else 500000 sys.stderr.write( f"ERROR: CAN interface '{iface}' (from {label}) not found.\n" f" For a virtual bench rig:\n" f" ip link add dev {iface} type vcan && ip link set {iface} up\n" f" For a real CAN adapter:\n" f" ip link set {iface} up type can bitrate {bitrate}\n" ) sys.exit(1) log_fp = None if args.log_decode: try: log_fp = open(args.log_decode, 'a', encoding='utf-8') except OSError as e: sys.stderr.write(f"ERROR: cannot open --log-decode file '{args.log_decode}': {e}\n") sys.exit(1) try: curses.wrapper(run, args, log_fp) except KeyboardInterrupt: pass finally: if log_fp is not None: log_fp.close() if __name__ == '__main__': main()