Real-Time Network Telemetry for AI: Building an Asynchronous NetFlow/sFlow Ingestion Pipeline in Python
The Architectural Challenge: Avoid the Bottleneck
The biggest pitfall when writing a packet sniffer in Python is blocking the execution loop. If your script captures a packet, processes it, formats a JSON string, and logs it sequentially, your script will choke and drop thousands of packets during a sudden burst of network traffic. To feed an AI infrastructure safely, we must decouple the Capture Phase from the Processing Phase using a Producer-Consumer multi-threaded pattern.
[ Wireless Adapter ]
│
▼ (Raw Packets)
┌────────────────────────────────────────────────────────┐
│ PHASE 1: Capture (Scapy AsyncSniffer Thread) │
│ - Sniffs raw frames in a non-blocking background loop │
└────────────────────────────────────────────────────────┘
│
▼ (Fast Handoff: .put_nowait)
┌────────────────────────────────────────────────────────┐
│ PHASE 2: Buffer (Thread-Safe bounded Queue) │
│ - Acts as a shock absorber for network traffic bursts │
└────────────────────────────────────────────────────────┘
│
▼ (Distributed to Worker Pool)
┌────────────────────────────────────────────────────────┐
│ PHASE 3: Process & Emit (Concurrent Thread Workers) │
│ - Worker 1 │ Worker 2 │ Worker 3 │ Worker 4 │
│ - Decodes Layers (IP, TCP, UDP) │
│ - Updates Stateful NetFlow Cache (Aggregates) │
│ - Outputs Structured JSON Streaming Event │
└────────────────────────────────────────────────────────┘
Production-Grade Pipeline Implementation
Below is the complete, modern Python implementation utilizing Scapy's explicit layer APIs, asynchronous execution, and thread-safe buffering.
Prerequisites
Ensure you have the native packet capture library installed on your host system (libpcap for Linux/macOS, Npcap for Windows) and install Scapy:
pip install scapy
The Ingestion Code
import os
import sys
import queue
import time
import json
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Tuple, Optional
# Explicitly target specific submodules for deterministic enterprise scopes
from scapy.all import AsyncSniffer, Packet, conf
from scapy.layers.inet import IP, TCP, UDP
# --- CONFIGURATION ---
MAX_QUEUE_SIZE = 10000
NUM_WORKERS = 4
# Replace with your specific active network interface index or string name
INTERFACE: Optional[str | int] = 24
# --- DATA MODELS ---
@dataclass(frozen=True)
class FlowKey:
src_ip: str
dst_ip: str
src_port: int
dst_port: int
protocol: int
@dataclass
class FlowMetrics:
packet_count: int = 0
byte_count: int = 0
first_seen: float = 0.0
last_seen: float = 0.0
# --- INGESTION PIPELINE ENGINE ---
class AIStreamIngestionPipeline:
def __init__(self, interface: Optional[str | int] = None):
self.interface = interface
self.packet_queue: queue.Queue[Packet] = queue.Queue(maxsize=MAX_QUEUE_SIZE)
self.flow_cache: Dict[FlowKey, FlowMetrics] = {}
self.executor = ThreadPoolExecutor(max_workers=NUM_WORKERS)
self.is_running = False
self.sniffer: Optional[AsyncSniffer] = None
def _extract_packet_meta(self, packet: Packet) -> Optional[Tuple[FlowKey, int]]:
"""Parses raw layers using container-optimized syntax."""
if IP not in packet:
return None
ip_layer = packet[IP]
src_port, dst_port = 0, 0
# Safe inspection of Layer 4 using direct class indexing
if TCP in packet:
src_port = packet[TCP].sport
dst_port = packet[TCP].dport
elif UDP in packet:
src_port = packet[UDP].sport
dst_port = packet[UDP].dport
key = FlowKey(
src_ip=ip_layer.src,
dst_ip=ip_layer.dst,
src_port=src_port,
dst_port=dst_port,
protocol=ip_layer.proto
)
return key, len(packet)
def _worker_process_queue(self) -> None:
"""Isolated consumer thread pool extracting flow features."""
while self.is_running or not self.packet_queue.empty():
try:
packet = self.packet_queue.get(timeout=0.5)
except queue.Empty:
continue
meta = self._extract_packet_meta(packet)
if not meta:
self.packet_queue.task_done()
continue
key, packet_len = meta
current_time = time.time()
# 1. Stateful Aggregation (NetFlow Telemetry Layer)
if key not in self.flow_cache:
self.flow_cache[key] = FlowMetrics(first_seen=current_time)
metrics = self.flow_cache[key]
metrics.packet_count += 1
metrics.byte_count += packet_len
metrics.last_seen = current_time
# 2. Structured Streaming Payload (sFlow Event Layer)
ingest_payload = {
"event_type": "network_flow_sample",
"timestamp": current_time,
"flow_key": asdict(key),
"packet_size_bytes": packet_len,
"aggregate_metrics": asdict(metrics)
}
# Emit clean JSON to stdout (pipe into a stream wrapper, Kafka, or vector db)
print(json.dumps(ingest_payload))
self.packet_queue.task_done()
def _enqueue_packet(self, packet: Packet) -> None:
"""High-speed producer callback. Hands off raw socket frames immediately."""
try:
self.packet_queue.put_nowait(packet)
except queue.Full:
# Backpressure handling: Drops edge frames gracefully if queue overflows
pass
def start(self) -> None:
"""Spawns workers and executes the non-blocking packet sniffer thread."""
print(f"[*] Initializing pipeline on interface: {self.interface or 'Default Active OS Interface'}", file=sys.stderr)
self.is_running = True
for _ in range(NUM_WORKERS):
self.executor.submit(self._worker_process_queue)
self.sniffer = AsyncSniffer(
iface=self.interface,
prn=self._enqueue_packet,
store=False # Crucial: Drops raw historical packets out of memory instantly
)
self.sniffer.start()
print("[*] Ingestion pipeline is live and streaming...", file=sys.stderr)
def stop(self) -> None:
"""Gracefully signs off threads and closes open sockets."""
print("\n[*] Shutting down ingestion pipeline gracefully...", file=sys.stderr)
if self.sniffer:
self.sniffer.stop()
self.is_running = False
self.executor.shutdown(wait=True)
print("[*] Pipeline stopped cleanly.", file=sys.stderr)
if __name__ == "__main__":
# Administrative privilege guard for raw socket access
if os.name != 'nt' and os.getuid() != 0:
print("[-] Critical Error: Root privileges required for raw network socket access.", file=sys.stderr)
sys.exit(1)
# Windows Device GUID Resolution block
target_interface = INTERFACE
if os.name == 'nt' and isinstance(INTERFACE, int):
try:
target_interface = conf.ifaces.dev_from_index(INTERFACE)
print(f"[*] Resolved index {INTERFACE} to device: {target_interface.description}", file=sys.stderr)
except Exception as e:
print(f"[-] Warning: Could not resolve interface index {INTERFACE}: {e}. Falling back.", file=sys.stderr)
pipeline = AIStreamIngestionPipeline(interface=target_interface)
try:
pipeline.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
pipeline.stop()
Under the Hood: Why This Architecture Works for AI
1. Modern Container Layer Membership
Instead of executing high-overhead inspection functions like packet.haslayer(IP), this script checks containment explicitly using IP in packet. Scapy overloads Python's internal __contains__ magic methods, ensuring layer validation occurs at the compiler layer.
2. Zero-Memory Aggregation (store=False)
By default, Scapy stores every single packet captured in an internal history list inside RAM. Running a default sniffer on an enterprise pipeline for an hour will result in a quick memory leak crash. By assigning store=False in our AsyncSniffer, we consume the packet from the adapter, pass it to our local queue, and free its memory instantly.
3. Backpressure Protection via Bounded Queues
Our ingestion loop forces a strict maxsize=10000 rule and uses a non-blocking put_nowait() execution. If the downstream data processor (e.g., your LLM evaluator or vector engine) experiences a latency hiccup, the pipeline intentionally drops incoming packets at the edge rather than letting the memory buffer balloon out of control.
4. Machine-Readable Feature Matrix Output
The pipeline aggregates connection metadata into a stateful 5-tuple format, while instantly streaming stateless structural JSON tokens. This gives your downstream AI immediate access to critical parameters:
{
"event_type": "network_flow_sample",
"timestamp": 1719409375.42,
"flow_key": {
"src_ip": "10.17.245.72",
"dst_ip": "142.250.180.142",
"src_port": 54211,
"dst_port": 443,
"protocol": 6
},
"packet_size_bytes": 1420,
"aggregate_metrics": {
"packet_count": 42,
"byte_count": 59640,
"first_seen": 1719409371.11,
"last_seen": 1719409375.42
}
}
This structural metadata provides your AI system with a comprehensive macro-view map of network behavior, allowing agents to accurately cross-reference anomalies without drowning your entire feature store in costly, raw packet captures.
Comments
No comments yet. Start the discussion.