BlackBull goes multi-protocol (part 3) - one app.py, two protocols
The first two posts were the engineering: part 1 rebuilt the connection dispatcher so BlackBull's core stopped assuming HTTP, and part 2 put a lock-free, single-owner MQTT 5 broker on top of that seam. This post is the payoff - what all of it looks like from an application author's chair.
The short version: one file, one pip install, two protocols. One decorator for HTTP, one for MQTT.
from blackbull import BlackBull
from blackbull.mqtt import MQTTExtension, Message
app = BlackBull()
mqtt = app.add_extension(MQTTExtension(port=1883))
@app.route(path='/')
async def index():
return "HTTP here; MQTT broker on :1883."
@mqtt.on_message(topic='sensors/{room}/temperature')
async def on_temperature(msg: Message, room: str):
print(f"{room}: {msg.payload.decode()}") # {room} captured like a path param
app.run(port=8000) # HTTP on 8000, MQTT on 1883
mqtt.on_message deliberately mirrors app.route: both decorate an async function, both match on an address pattern, and {room} captures a topic level the same way {task_id} captures a URL path segment. The difference is semantics - an HTTP route is the response; an MQTT tap observes the broker's routing, which delivers to subscribers whether or not any handler is registered.
MQTTExtension registers itself through the single extension seam (app.add_extension) and binds :1883. The core BlackBull class carries zero MQTT-specific code - the broker lives entirely in blackbull.mqtt.
Drive it with the tools you already have
It's a real MQTT 5 broker - CONNECT / SUBSCRIBE / PUBLISH at QoS 0–2, retained messages, and Last-Will - so standard clients just work:
mosquitto_sub -t 'sensors/#' -p 1883 -V 5 # terminal 1
mosquitto_pub -t 'sensors/room1/temperature' \
-m '21.5' -p 1883 -V 5 # terminal 2
The message appears in the subscriber's terminal and fires the on_temperature handler above. One process. No apt install mosquitto, no broker sidecar, no C extension.
When HTTP and MQTT share a process
The real reason to put two protocols in one process isn't cleaner ops - it's shared memory without a sidecar. Here's a tiny temperature dashboard that would normally need a broker, a web server, and a Redis instance to bridge them:
from blackbull import BlackBull
from blackbull.mqtt import MQTTExtension, Message
app = BlackBull()
mqtt = app.add_extension(MQTTExtension(port=1883))
# Plain dict - no locks, no Redis, no cross-process serialization.
latest: dict[str, float] = {}
@mqtt.on_message(topic='sensors/{room}/temperature')
async def ingest(msg: Message, room: str):
latest[room] = float(msg.payload)
@app.route(path='/sensors')
async def dashboard():
return latest # {"room1": 21.5, "room2": 30.1}
app.run(port=8000)
A sensor publishes 21.5 to sensors/room1/temperature via MQTT. The broker routes it to subscribers and fires ingest, which updates the dictionary. A browser hits /sensors and sees the current state as JSON. One variable, two protocols, zero glue infrastructure.
This is what "multi-protocol" buys you in practice: the broker and the web server live in the same memory space, so the bridge between them is just a function call that writes to a dict. No redis-py, no pika, no serialization - just Python.
Machine-readable docs for both protocols
BlackBull already auto-generates an OpenAPI 3.1 document from your route signatures:
app.enable_openapi() # publishes /openapi.json and /docs
OpenAPI is an HTTP request/response vocabulary, though - it has nothing to say about topics and publish/subscribe. The messaging world's counterpart is AsyncAPI, and BlackBull ships the counterpart:
from blackbull.mqtt import AsyncAPIExtension
app.add_extension(AsyncAPIExtension(title='Sensor Gateway', version='1.0.0'))
After app.run() the document is served at /asyncapi.json, with a CDN-hosted HTML viewer at /asyncapi (no new Python dependency). Each on_message topic filter becomes a channel; each callback a receive operation. It's generated lazily, so taps registered after the extension are still documented.
Your HTTP surface describes itself with OpenAPI; your MQTT surface describes itself with AsyncAPI; same idea, right vocabulary for each.
The whole arc
Start to finish, that's the story:
- BlackBull decided to be a multi-protocol server.
- It prepared the ground - a protocol-agnostic dispatcher (peek-and-replay), built behind a regression oracle so the live HTTP server broke nothing.
- It built on that ground - a lock-free, single-owner MQTT 5 broker on the actor model, with HTTP throughput actually a little higher than before.
- And it stayed one
app.py: HTTP and MQTT side by side, sharing memory, each self-documenting - no sidecars, no C extensions.
What's next
MQTT is the first consumer of the multi-protocol seam, not the last. @app.raw_handler already lets you bind raw TCP to a port and handle the reader/writer yourself for custom protocols. A first-class gRPC handler - riding the existing HTTP/2 stream, flow-control, and trailers machinery - is the natural next step. Same pattern: register a ProtocolBinding, let the core detect and dispatch. The core won't have to learn a thing about gRPC. That's the whole point.
Try it
pip install 'blackbull[mqtt]'
python examples/mqtt_broker.py
Source: github.com/TOKUJI/BlackBull
Docs: tokuji.github.io/BlackBull
Comments
No comments yet. Start the discussion.