Hacker News

Backon – Python retry (zero deps, circuit breaker, async native)

Features

  • Zero dependencies - pure Python, stdlib only
  • Four APIs - decorator (@on_exception, @on_predicate), functional (retry()), context manager (Retrying), callable (RetryingCaller/AsyncRetryingCaller)
  • Async native - same API works for async def functions
  • Full type hints - validated with mypy, strict mode compatible
  • Global toggle - backon.disable() / backon.enable() for testing
  • Custom sleep - inject your own sleep function (useful for testing with asyncio.Event)
  • Multiple wait strategies - exponential, constant, Fibonacci, decay, runtime, randomized, incremental, and composable chains
  • Jitter - full jitter, random jitter, or none
  • Rich callbacks - on_attempt, on_backoff, on_success, on_giveup, before_sleep, before, after
  • Circuit breaker - CLOSED/OPEN/HALF_OPEN states with automatic recovery
  • Hedging - concurrent retry requests, first-success-wins
  • Prometheus / OpenTelemetry metrics - optional, zero hard dependencies
  • Testing module - disable_retries(), limit_retries(), remove_backoff(), assert_retried()
  • Trio support - retry with the trio async framework
  • Operator overloading - compose stops with |/&, wait generators with +
  • Iterator API - for attempt in Retrying(...):
  • Modern packaging - PEP 621, PDM, py.typed

Installation

pip install backon

Requires Python 3.10+.

Quick Start

import backon

@backon.on_exception(backon.expo, ValueError, max_tries=3)
def fetch_data():
    return api.call()

@backon.on_predicate(backon.constant, max_tries=5, interval=0.5)
def poll_status():
    return check_ready()

result = backon.retry(
    fetch_data,
    backon.expo,
    exception=ValueError,
    max_tries=3,
)

with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
    result = r.call(fetch_data)

Async variant:

async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
    result = await r.async_call(fetch_data)

API Reference

@on_exception

Retry when the decorated function raises one of the specified exceptions.

@backon.on_exception(backon.expo, (ValueError, TimeoutError), max_tries=5)
def fetch():
    ...
Argument Type Default Description
wait_gen WaitGenerator - Wait strategy (expo, constant, fibo, etc.)
exception type or tuple[type] - Exception class(es) to retry on
max_tries int or Callable[[], int] None Maximum number of attempts
max_time float, timedelta, or Callable None Maximum total elapsed time
jitter Jitterer or None full_jitter Jitter function
giveup Callable[[Exception], bool or float] lambda e: False Stop retrying for matching exceptions; return float to override wait
on_success Handler or list None Called after successful attempt
on_backoff Handler or list None Called before each retry
on_giveup Handler or list None Called when retries exhausted
on_attempt Handler or list None Called before each attempt
before_sleep Handler or list None Called before sleeping
before Handler or list None Called before each attempt (lower-level than on_attempt)
after Handler or list None Called after each attempt (lower-level than on_success/on_giveup)
retry_error_callback Callable[[dict], Any] None Called when retry gives up instead of raising
raise_on_giveup bool True Raise final exception when giving up
logger str or Logger "backon" Logger name or instance
backoff_log_level int logging.INFO Log level for backoff messages
giveup_log_level int logging.ERROR Log level for giveup messages
sleep Callable[[float], Any] None Custom sleep function
**wait_gen_kwargs varies - Extra kwargs passed to the wait generator (e.g. base=3, interval=0.5)

@on_predicate

Retry while the predicate matches the return value.

@backon.on_predicate(backon.constant, predicate=lambda x: x is None, max_tries=5)
def poll():
    ...

Accepts all parameters from on_exception except exception, giveup, and raise_on_giveup. Adds:

Argument Type Default Description
predicate Callable[[Any], bool] operator.not_ Retry when this returns True for the return value

retry()

result = backon.retry(
    target=my_function,
    wait_gen=backon.expo,
    exception=ValueError,
    max_tries=3,
)

Accepts all parameters from on_exception plus on_predicate extras, plus:

Argument Type Default Description
condition RetryCondition None Advanced retry condition object
stop Stop None Advanced stop condition object
name str "" Identifier for the retry call
**wait_gen_kwargs varies - Extra kwargs passed to the wait generator

If target is a coroutine function, retry() returns a coroutine. Otherwise it returns the result synchronously.

Retrying (Context Manager)

with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
    r.call(my_function)

async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
    await r.async_call(my_async_function)
Method Description
call(target, *args, **kwargs) Execute synchronously
async_call(target, *args, **kwargs) Execute asynchronously
copy() Return a modified copy of the Retrying instance
statistics Property returning dict with attempt_number, elapsed, idle_for, start_time
call_state Property returning the current RetryCallState
enabled Property to enable/disable retry per-instance

Arguments: Same as retry(), plus enabled (default True).

RetryingCaller / AsyncRetryingCaller

A callable object with pre-bound exception type via .on().

caller = backon.RetryingCaller(backon.expo, max_tries=3)
caller = caller.on(ValueError)
result = caller(my_function, arg1, arg2)

Async variant of RetryingCaller:

caller = backon.AsyncRetryingCaller(backon.expo, max_tries=3).on(ValueError)
result = await caller(my_async_function, arg1, arg2)
Method Description
.on(exception) Return a copy bound to the given exception type
.copy() Return a modified copy
.__call__(target, *args, **kwargs) Execute with retry

Wait Generators

All wait generators are callables that produce a sequence of wait times. Pass extra kwargs (e.g. interval=0.5, base=3) as **wait_gen_kwargs to decorators and functions.

Generator Signature Description
expo (base=2, factor=1, max_value=None) Exponential backoff: factor * base^n
constant (interval=1) Fixed interval; accepts float or Sequence[float] for varied intervals
fibo (max_value=None) Fibonacci sequence: 1, 1, 2, 3, 5, 8, ...
runtime (value=Callable) Dynamic wait from return value or exception - useful for Retry-After headers
decay (initial_value=1, decay_factor=1, min_value=None) Exponential decay: initial * e^(-t * decay_factor)
wait_random_exponential (multiplier=1, max_value=None, exp_base=2, min_value=0) Randomized exponential (uniform random between 0 and the exponential value)
wait_incrementing (start=1, increment=1, max_value=None) Linear increment: start + n * increment
wait_chain (*generators) Sequentially play through multiple generators
wait_exception (value=Callable) Dynamic wait based on the caught exception
wait_random (min=0, max=1) Uniform random wait between min and max
wait_exponential_jitter (initial=1, max=60, exp_base=2, jitter=1) Exponential backoff with added random jitter
wait_none () Always returns 0 (no wait)

Composition: Combine wait generators with +:

wait_strategy = backon.expo(base=3) + backon.constant(interval=0.5)

Stop Conditions

Stop conditions determine when retry should cease. They can be composed with | (any) and & (all).

Condition Description
stop_after_attempt(max_attempts) Stop after N attempts
stop_after_delay(max_delay) Stop after total elapsed time exceeds max_delay seconds
stop_before_delay(max_delay) Stop if the next wait would exceed max_delay
stop_all(*stops) Stop when all sub-conditions are met
stop_any(*stops) Stop when any sub-condition is met
stop_never() Never stop (retry indefinitely)
stop_when_event_set(event) Stop when a threading.Event is set
from backon import stop_after_attempt, stop_after_delay, stop_any

stop = stop_after_attempt(5) | stop_after_delay(30.0)

Retry Conditions

Retry conditions determine whether a retry should happen. They can be composed with | and &.

Condition Description
retry_if_exception_type(*types) Retry if exception is an instance of given type(s)
retry_if_exception(predicate) Retry if the exception matches a custom predicate
retry_if_exception_message(message, match=None) Retry if exception message contains a string (or matches regex with match="re")
retry_if_result(predicate) Retry if the return value matches a predicate
retry_if_not_result(predicate) Retry if the return value does NOT match a predicate
retry_all(*conditions) Retry only when all conditions pass
retry_any(*conditions) Retry when any condition passes
retry_always() Always retry
retry_never() Never retry
from backon import retry_if_exception_type, retry_if_exception_message, retry_all

condition = retry_all(
    retry_if_exception_type(HTTPError),
    retry_if_exception_message("429"),
)

Jitter

@backon.on_exception(backon.expo, ValueError, jitter=backon.full_jitter)
def f():
    ...
Jitter Effect
backon.full_jitter Random value between 0 and the calculated wait time
backon.random_jitter Adds random() to the calculated wait time (~+0.5s on average)
None No jitter (deterministic waits)

Handlers

Handlers receive a details dict with contextual information:

def handler(details):
    print(f"Attempt {details['tries']}, elapsed {details['elapsed']:.2f}s")

@backon.on_exception(
    backon.expo,
    ValueError,
    max_tries=3,
    on_attempt=handler,
    on_backoff=handler,
    on_success=handler,
    on_giveup=handler,
)
def f():
    ...

Available keys in details:

Key Available in
target All
args, kwargs All
tries All
elapsed All
value on_success, on_backoff, on_giveup
exception on_backoff, on_giveup
wait on_backoff, before_sleep

Global Toggle

Useful in tests to disable retry logic globally:

backon.disable()  # skip retry, call function directly
backon.enable()   # re-enable retry

Per-instance toggle via Retrying.enabled:

r = backon.Retrying(backon.expo, exception=ValueError, max_tries=3)
r.enabled = False
result = r.call(fn)  # no retry

Async Support

All three APIs work with async functions transparently:

@backon.on_exception(backon.expo, ValueError, max_tries=3)
async def fetch():
    return await api.call()

result = await backon.retry(fetch, backon.expo, exception=ValueError, max_tries=3)

async with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
    result = await r.async_call(fetch)

Custom Sleep

Replace the default sleep for testing or special environments:

@backon.on_exception(
    backon.expo,
    ValueError,
    max_tries=3,
    sleep=lambda s: print(f"waiting {s}s"),
)
def f():
    ...

# With asyncio.Event for testing
import asyncio

event = asyncio.Event()

@backon.on_exception(
    backon.expo,
    ValueError,
    max_tries=3,
    sleep=backon.sleep_using_event(event),
)
async def f():
    ...

Advanced Features

Circuit Breaker

Circuit breaker with three states: CLOSED (normal), OPEN (failing), HALF_OPEN (testing recovery).

from backon._circuit_breaker import CircuitBreaker, BreakerRetrying, CircuitOpenError

breaker = BreakerRetrying(
    backon.expo,
    max_tries=3,
    breaker=CircuitBreaker(
        failure_threshold=5,
        recovery_timeout=60.0,
        half_open_max_calls=1,
    ),
)

try:
    result = breaker.call(fetch)
except CircuitOpenError:
    print("Circuit is open, skipping request")
CircuitBreaker parameter Default Description
failure_threshold 5 Consecutive failures before opening the circuit
recovery_timeout 60.0 Seconds before transitioning from OPEN to HALF_OPEN
half_open_max_calls 1 Allowed calls in HALF_OPEN state before fully closing
name "" Identifier for the breaker

Hedging

Run multiple retry attempts concurrently and return the first success.

from backon._hedging import hedge, HedgingRetrying

# Functional
result = hedge(fetch, backon.expo, max_hedge=3)

# Decorator
@backon.on_hedge(backon.expo, max_hedge=3)
def fetch():
    ...

# Context manager
with HedgingRetrying(backon.expo, max_hedge=3) as h:
    result = h.call(fetch)
Parameter Default Description
max_hedge 3 Number of concurrent hedged requests
timeout None Maximum time to wait for any hedge
on_hedge None Callback when a hedge request is sent

Prometheus / OpenTelemetry Metrics

Optional Prometheus and OpenTelemetry metrics. Requires prometheus_client or opentelemetry-api to be installed.

from backon._instrumentation import PrometheusMetrics, OTelMetrics, set_metrics_collector

# Prometheus
set_metrics_collector(PrometheusMetrics())

# OpenTelemetry
set_metrics_collector(OTelMetrics(meter_name="myapp.backon"))

Metrics collected:

  • backon_retry_attempts_total (attempts, labeled by target and exception type)
  • backon_retry_success_total (successes)
  • backon_retry_failure_total (failures)
  • backon_circuit_breaker_open_total / backon_circuit_breaker_close_total
  • backon_hedge_requests_total
  • backon.retry.attempt_duration (histogram, OTel only)

Testing Module

from backon._testing import (
    disable_retries,
    enable_retries,
    test_config,
    limit_retries,
    remove_backoff,
    assert_retried,
    assert_not_retried,
)

# Context manager that skips retry for a block
with disable_retries():
    result = fetch()

# Limit max retries in tests
with limit_retries(2):
    fetch()

# Remove backoff delay entirely
with remove_backoff():
    fetch()

# Assert the function was retried N times
assert_retried(fetch, expected_tries=3)

Trio Support

Retry with the trio async framework.

Comments

No comments yet. Start the discussion.