WireWatch: A Minimal I2C Bus Monitor with Raspberry Pi Pico W
What this is
WireWatch is a tiny, local I2C sniffer that runs on a Raspberry Pi Pico W. It passively watches SDA and SCL, decodes start/stop conditions, addresses and data bytes, and provides a minimal web UI on your LAN so you can inspect I2C traffic without sending it to the cloud. The goal is low parts count, low complexity, and useful output for debugging sensors and displays.
What you need
- Raspberry Pi Pico W (any RP2040 board with Wi‑Fi will do)
- 2× 4.7k pull‑up resistors (to 3.3V)
- 3 female dupont wires
- Optional: small breadboard or header pins
Wiring
- Connect the I2C bus ground to the Pico GND.
- Connect SDA from the target bus to a GPIO capable of input — for this example use GP0.
- Connect SCL from the target bus to another GPIO — use GP1.
- Add 4.7k pull‑ups from SDA and SCL to 3.3V (do not use 5V pull‑ups).
Important: this is a passive monitor. Do not try to drive the bus from the Pico while sniffing. If you need to talk to devices, do it from a separate master or power cycle the Pico out of the way.
How it works (short)
The Pico watches both SDA and SCL as inputs. Each rising edge of SCL is where a data bit is valid, so we sample SDA on SCL rising and timestamp it. Start and stop conditions are detected as SDA transitions while SCL is high. A background decoder reconstructs bits into bytes and annotates read/write and ACK conditions. The web server serves the latest N transactions as a tiny HTML page and an endpoint that returns JSON for scripted inspection.
MicroPython sketch (essential parts)
This is the minimal decoding and server loop. Save as code.py on the Pico. Shrink or extend as you need.
import network, socket, time, ujson
from machine import Pin
SDA_PIN = 0
SCL_PIN = 1
sda = Pin(SDA_PIN, Pin.IN)
scl = Pin(SCL_PIN, Pin.IN)
buffer = [] # raw edge samples: (t_us, sda, scl)
transactions = [] # decoded items for UI
MAX_TX = 200
# ISR: record edges with microsecond timestamps
last_scl = scl.value()
last_sda = sda.value()
def poll_edges(p):
global last_scl, last_sda
s = sda.value(); c = scl.value()
if s != last_sda or c != last_scl:
buffer.append((time.ticks_us(), s, c))
last_sda = s; last_scl = c
# Use a timer or soft IRQ; simple loop will work too
scl.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=poll_edges)
sda.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=poll_edges)
# Simple decoder run in main loop
def decode_edges():
global buffer, transactions
if not buffer: return
edges = buffer
buffer = []
bits = []
i = 0
while i < len(edges)-1:
t, s, c = edges[i]
t2, s2, c2 = edges[i+1]
# detect start: SDA goes low while SCL high
if s != s2 and c == 1:
if s == 1 and s2 == 0:
bits.append(('START', t2))
elif s == 0 and s2 == 1:
bits.append(('STOP', t2))
# sample SDA on SCL rising
if c == 0 and c2 == 1:
bits.append(('BIT', s2, t2))
i += 1
# convert bit stream to bytes
cur = []
mode = 'idle'
for item in bits:
if item[0]=='START':
cur = []
mode = 'in_frame'
continue
if item[0]=='STOP':
if cur:
transactions.append(cur)
if len(transactions)>MAX_TX: transactions.pop(0)
mode='idle'; cur=[]; continue
if item[0]=='BIT' and mode=='in_frame':
cur.append(item[1])
# group bits into bytes
if len(cur) % 9 == 0: # 8 data bits + ACK
byte_bits = cur[-9:-1]
b = 0
for bit in byte_bits:
b = (b<<1) | bit
ack = cur[-1]
cur[-9:] = []
# append as tuple (byte, ack)
transactions.append((b, ack))
if len(transactions)>MAX_TX: transactions.pop(0)
# Very small web server
ap_if = network.WLAN(network.STA_IF)
ap_if.active(True)
ap_if.connect('YOUR_SSID','YOUR_PASS')
while not ap_if.isconnected(): time.sleep(1)
addr = socket.getaddrinfo('0.0.0.0', 80)[0][-1]
s = socket.socket()
s.bind(addr); s.listen(1)
print('listening on', addr)
def http_response(conn, content):
conn.send('HTTP/1.0 200 OK\r\nContent-Type: text/html\r\n\r\n')
conn.send(content)
while True:
decode_edges()
try:
cl, addr = s.accept()
req = cl.recv(1024)
path = req.split(b' ')[1]
if path==b'/data':
cl.send('HTTP/1.0 200 OK\r\nContent-Type: application/json\r\n\r\n')
cl.send(ujson.dumps(transactions))
else:
# minimal page
page = 'WireWatch
'+ujson.dumps(transactions)+'
'
http_response(cl, page)
cl.close()
except Exception as e:
pass
Using WireWatch
- Power your I2C devices and connect wiring as above.
- Flash the MicroPython code and edit your Wi‑Fi credentials in the sketch.
- Open the Pico's IP in a browser on the same LAN. The page shows recent decoded items and
/datareturns JSON for scripting.
Interpretation: transaction tuples are either (byte, ack) pairs for decoded bytes, or the raw 'START'/'STOP' markers depending on the simplified decoder above. This is intentionally minimal — it gives you addresses and payloads quickly.
Troubleshooting
- No data? Check pull‑ups and that the Pico isn't powering the bus. Use a scope or logic analyzer to confirm signals if you can.
- Garbled bytes? Your ISR queue may overflow under heavy traffic; lowering bus speed or filtering edges in hardware is the fix. You can also increase buffer processing frequency.
- No Wi‑Fi? Verify credentials and that the Pico W firmware supports your AP's security. Use a phone hotspot if needed.
Next steps
- Improve decoding: reconstruct address/read/write framing and correlate ACK/NACK cleanly.
- Add a nicer local UI (AJAX + canvas timeline) or WebSocket for live updates.
- Port the ISR to PIO for higher bus speeds and more reliable sampling.
If you want a compact, privacy‑friendly way to peek at what's happening on your I2C bus without involving the cloud, WireWatch is a practical starting point — simple, hackable, and useful for most low‑speed sensors and displays.