DEV Community

BlackBull goes multi-protocol (part 3) - one app.py, two protocols

The first two posts were the engineering: part 1 rebuilt the connection dispatcher so BlackBull's core stopped assuming HTTP, and part 2 put a lock-free, single-owner MQTT 5 broker on top of that seam. This post is the payoff - what all of it looks like from an application author's chair.

The short version: one file, one pip install, two protocols. One decorator for HTTP, one for MQTT.

from blackbull import BlackBull
from blackbull.mqtt import MQTTExtension, Message

app = BlackBull()
mqtt = app.add_extension(MQTTExtension(port=1883))

@app.route(path='/')
async def index():
    return "HTTP here; MQTT broker on :1883."

@mqtt.on_message(topic='sensors/{room}/temperature')
async def on_temperature(msg: Message, room: str):
    print(f"{room}: {msg.payload.decode()}")  # {room} captured like a path param

app.run(port=8000)  # HTTP on 8000, MQTT on 1883

mqtt.on_message deliberately mirrors app.route: both decorate an async function, both match on an address pattern, and {room} captures a topic level the same way {task_id} captures a URL path segment. The difference is semantics - an HTTP route is the response; an MQTT tap observes the broker's routing, which delivers to subscribers whether or not any handler is registered.

MQTTExtension registers itself through the single extension seam (app.add_extension) and binds :1883. The core BlackBull class carries zero MQTT-specific code - the broker lives entirely in blackbull.mqtt.

Drive it with the tools you already have

It's a real MQTT 5 broker - CONNECT / SUBSCRIBE / PUBLISH at QoS 0–2, retained messages, and Last-Will - so standard clients just work:

mosquitto_sub -t 'sensors/#' -p 1883 -V 5  # terminal 1
mosquitto_pub -t 'sensors/room1/temperature' \
  -m '21.5' -p 1883 -V 5  # terminal 2

The message appears in the subscriber's terminal and fires the on_temperature handler above. One process. No apt install mosquitto, no broker sidecar, no C extension.

When HTTP and MQTT share a process

The real reason to put two protocols in one process isn't cleaner ops - it's shared memory without a sidecar. Here's a tiny temperature dashboard that would normally need a broker, a web server, and a Redis instance to bridge them:

from blackbull import BlackBull
from blackbull.mqtt import MQTTExtension, Message

app = BlackBull()
mqtt = app.add_extension(MQTTExtension(port=1883))

# Plain dict - no locks, no Redis, no cross-process serialization.
latest: dict[str, float] = {}

@mqtt.on_message(topic='sensors/{room}/temperature')
async def ingest(msg: Message, room: str):
    latest[room] = float(msg.payload)

@app.route(path='/sensors')
async def dashboard():
    return latest  # {"room1": 21.5, "room2": 30.1}

app.run(port=8000)

A sensor publishes 21.5 to sensors/room1/temperature via MQTT. The broker routes it to subscribers and fires ingest, which updates the dictionary. A browser hits /sensors and sees the current state as JSON. One variable, two protocols, zero glue infrastructure.

This is what "multi-protocol" buys you in practice: the broker and the web server live in the same memory space, so the bridge between them is just a function call that writes to a dict. No redis-py, no pika, no serialization - just Python.

Machine-readable docs for both protocols

BlackBull already auto-generates an OpenAPI 3.1 document from your route signatures:

app.enable_openapi()  # publishes /openapi.json and /docs

OpenAPI is an HTTP request/response vocabulary, though - it has nothing to say about topics and publish/subscribe. The messaging world's counterpart is AsyncAPI, and BlackBull ships the counterpart:

from blackbull.mqtt import AsyncAPIExtension

app.add_extension(AsyncAPIExtension(title='Sensor Gateway', version='1.0.0'))

After app.run() the document is served at /asyncapi.json, with a CDN-hosted HTML viewer at /asyncapi (no new Python dependency). Each on_message topic filter becomes a channel; each callback a receive operation. It's generated lazily, so taps registered after the extension are still documented.

Your HTTP surface describes itself with OpenAPI; your MQTT surface describes itself with AsyncAPI; same idea, right vocabulary for each.

The whole arc

Start to finish, that's the story:

  • BlackBull decided to be a multi-protocol server.
  • It prepared the ground - a protocol-agnostic dispatcher (peek-and-replay), built behind a regression oracle so the live HTTP server broke nothing.
  • It built on that ground - a lock-free, single-owner MQTT 5 broker on the actor model, with HTTP throughput actually a little higher than before.
  • And it stayed one app.py: HTTP and MQTT side by side, sharing memory, each self-documenting - no sidecars, no C extensions.

What's next

MQTT is the first consumer of the multi-protocol seam, not the last. @app.raw_handler already lets you bind raw TCP to a port and handle the reader/writer yourself for custom protocols. A first-class gRPC handler - riding the existing HTTP/2 stream, flow-control, and trailers machinery - is the natural next step. Same pattern: register a ProtocolBinding, let the core detect and dispatch. The core won't have to learn a thing about gRPC. That's the whole point.

Try it

pip install 'blackbull[mqtt]'
python examples/mqtt_broker.py

Source: github.com/TOKUJI/BlackBull
Docs: tokuji.github.io/BlackBull

Comments

No comments yet. Start the discussion.