Backon – Python retry (zero deps, circuit breaker, async native)
Features
- Zero dependencies - pure Python, stdlib only
- Four APIs - decorator (
@on_exception,@on_predicate), functional (retry()), context manager (Retrying), callable (RetryingCaller/AsyncRetryingCaller) - Async native - same API works for
async deffunctions - Full type hints - validated with mypy, strict mode compatible
- Global toggle -
backon.disable()/backon.enable()for testing - Custom sleep - inject your own sleep function (useful for testing with
asyncio.Event) - Multiple wait strategies - exponential, constant, Fibonacci, decay, runtime, randomized, incremental, and composable chains
- Jitter - full jitter, random jitter, or none
- Rich callbacks -
on_attempt,on_backoff,on_success,on_giveup,before_sleep,before,after - Circuit breaker - CLOSED/OPEN/HALF_OPEN states with automatic recovery
- Hedging - concurrent retry requests, first-success-wins
- Prometheus / OpenTelemetry metrics - optional, zero hard dependencies
- Testing module -
disable_retries(),limit_retries(),remove_backoff(),assert_retried() - Trio support - retry with the trio async framework
- Operator overloading - compose stops with
|/&, wait generators with+ - Iterator API -
for attempt in Retrying(...): - Modern packaging - PEP 621, PDM,
py.typed
Installation
pip install backon
Requires Python 3.10+.
Quick Start
import backon
@backon.on_exception(backon.expo, ValueError, max_tries=3)
def fetch_data():
return api.call()
@backon.on_predicate(backon.constant, max_tries=5, interval=0.5)
def poll_status():
return check_ready()
result = backon.retry(
fetch_data,
backon.expo,
exception=ValueError,
max_tries=3,
)
with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
result = r.call(fetch_data)
Async variant:
async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
result = await r.async_call(fetch_data)
API Reference
@on_exception
Retry when the decorated function raises one of the specified exceptions.
@backon.on_exception(backon.expo, (ValueError, TimeoutError), max_tries=5)
def fetch():
...
| Argument | Type | Default | Description |
|---|---|---|---|
wait_gen |
WaitGenerator | - | Wait strategy (expo, constant, fibo, etc.) |
exception |
type or tuple[type] | - | Exception class(es) to retry on |
max_tries |
int or Callable[[], int] | None | Maximum number of attempts |
max_time |
float, timedelta, or Callable | None | Maximum total elapsed time |
jitter |
Jitterer or None | full_jitter |
Jitter function |
giveup |
Callable[[Exception], bool or float] | lambda e: False |
Stop retrying for matching exceptions; return float to override wait |
on_success |
Handler or list | None | Called after successful attempt |
on_backoff |
Handler or list | None | Called before each retry |
on_giveup |
Handler or list | None | Called when retries exhausted |
on_attempt |
Handler or list | None | Called before each attempt |
before_sleep |
Handler or list | None | Called before sleeping |
before |
Handler or list | None | Called before each attempt (lower-level than on_attempt) |
after |
Handler or list | None | Called after each attempt (lower-level than on_success/on_giveup) |
retry_error_callback |
Callable[[dict], Any] | None | Called when retry gives up instead of raising |
raise_on_giveup |
bool | True | Raise final exception when giving up |
logger |
str or Logger | "backon" | Logger name or instance |
backoff_log_level |
int | logging.INFO |
Log level for backoff messages |
giveup_log_level |
int | logging.ERROR |
Log level for giveup messages |
sleep |
Callable[[float], Any] | None | Custom sleep function |
**wait_gen_kwargs |
varies | - | Extra kwargs passed to the wait generator (e.g. base=3, interval=0.5) |
@on_predicate
Retry while the predicate matches the return value.
@backon.on_predicate(backon.constant, predicate=lambda x: x is None, max_tries=5)
def poll():
...
Accepts all parameters from on_exception except exception, giveup, and raise_on_giveup. Adds:
| Argument | Type | Default | Description |
|---|---|---|---|
predicate |
Callable[[Any], bool] | operator.not_ |
Retry when this returns True for the return value |
retry()
result = backon.retry(
target=my_function,
wait_gen=backon.expo,
exception=ValueError,
max_tries=3,
)
Accepts all parameters from on_exception plus on_predicate extras, plus:
| Argument | Type | Default | Description |
|---|---|---|---|
condition |
RetryCondition | None | Advanced retry condition object |
stop |
Stop | None | Advanced stop condition object |
name |
str | "" | Identifier for the retry call |
**wait_gen_kwargs |
varies | - | Extra kwargs passed to the wait generator |
If target is a coroutine function, retry() returns a coroutine. Otherwise it returns the result synchronously.
Retrying (Context Manager)
with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
r.call(my_function)
async with backon.Retrying(backon.constant, exception=ValueError, max_tries=3, interval=0.5) as r:
await r.async_call(my_async_function)
| Method | Description |
|---|---|
call(target, *args, **kwargs) |
Execute synchronously |
async_call(target, *args, **kwargs) |
Execute asynchronously |
copy() |
Return a modified copy of the Retrying instance |
statistics |
Property returning dict with attempt_number, elapsed, idle_for, start_time |
call_state |
Property returning the current RetryCallState |
enabled |
Property to enable/disable retry per-instance |
Arguments: Same as retry(), plus enabled (default True).
RetryingCaller / AsyncRetryingCaller
A callable object with pre-bound exception type via .on().
caller = backon.RetryingCaller(backon.expo, max_tries=3)
caller = caller.on(ValueError)
result = caller(my_function, arg1, arg2)
Async variant of RetryingCaller:
caller = backon.AsyncRetryingCaller(backon.expo, max_tries=3).on(ValueError)
result = await caller(my_async_function, arg1, arg2)
| Method | Description |
|---|---|
.on(exception) |
Return a copy bound to the given exception type |
.copy() |
Return a modified copy |
.__call__(target, *args, **kwargs) |
Execute with retry |
Wait Generators
All wait generators are callables that produce a sequence of wait times. Pass extra kwargs (e.g. interval=0.5, base=3) as **wait_gen_kwargs to decorators and functions.
| Generator | Signature | Description |
|---|---|---|
expo |
(base=2, factor=1, max_value=None) |
Exponential backoff: factor * base^n |
constant |
(interval=1) |
Fixed interval; accepts float or Sequence[float] for varied intervals |
fibo |
(max_value=None) |
Fibonacci sequence: 1, 1, 2, 3, 5, 8, ... |
runtime |
(value=Callable) |
Dynamic wait from return value or exception - useful for Retry-After headers |
decay |
(initial_value=1, decay_factor=1, min_value=None) |
Exponential decay: initial * e^(-t * decay_factor) |
wait_random_exponential |
(multiplier=1, max_value=None, exp_base=2, min_value=0) |
Randomized exponential (uniform random between 0 and the exponential value) |
wait_incrementing |
(start=1, increment=1, max_value=None) |
Linear increment: start + n * increment |
wait_chain |
(*generators) |
Sequentially play through multiple generators |
wait_exception |
(value=Callable) |
Dynamic wait based on the caught exception |
wait_random |
(min=0, max=1) |
Uniform random wait between min and max |
wait_exponential_jitter |
(initial=1, max=60, exp_base=2, jitter=1) |
Exponential backoff with added random jitter |
wait_none |
() |
Always returns 0 (no wait) |
Composition: Combine wait generators with +:
wait_strategy = backon.expo(base=3) + backon.constant(interval=0.5)
Stop Conditions
Stop conditions determine when retry should cease. They can be composed with | (any) and & (all).
| Condition | Description |
|---|---|
stop_after_attempt(max_attempts) |
Stop after N attempts |
stop_after_delay(max_delay) |
Stop after total elapsed time exceeds max_delay seconds |
stop_before_delay(max_delay) |
Stop if the next wait would exceed max_delay |
stop_all(*stops) |
Stop when all sub-conditions are met |
stop_any(*stops) |
Stop when any sub-condition is met |
stop_never() |
Never stop (retry indefinitely) |
stop_when_event_set(event) |
Stop when a threading.Event is set |
from backon import stop_after_attempt, stop_after_delay, stop_any
stop = stop_after_attempt(5) | stop_after_delay(30.0)
Retry Conditions
Retry conditions determine whether a retry should happen. They can be composed with | and &.
| Condition | Description |
|---|---|
retry_if_exception_type(*types) |
Retry if exception is an instance of given type(s) |
retry_if_exception(predicate) |
Retry if the exception matches a custom predicate |
retry_if_exception_message(message, match=None) |
Retry if exception message contains a string (or matches regex with match="re") |
retry_if_result(predicate) |
Retry if the return value matches a predicate |
retry_if_not_result(predicate) |
Retry if the return value does NOT match a predicate |
retry_all(*conditions) |
Retry only when all conditions pass |
retry_any(*conditions) |
Retry when any condition passes |
retry_always() |
Always retry |
retry_never() |
Never retry |
from backon import retry_if_exception_type, retry_if_exception_message, retry_all
condition = retry_all(
retry_if_exception_type(HTTPError),
retry_if_exception_message("429"),
)
Jitter
@backon.on_exception(backon.expo, ValueError, jitter=backon.full_jitter)
def f():
...
| Jitter | Effect |
|---|---|
backon.full_jitter |
Random value between 0 and the calculated wait time |
backon.random_jitter |
Adds random() to the calculated wait time (~+0.5s on average) |
None |
No jitter (deterministic waits) |
Handlers
Handlers receive a details dict with contextual information:
def handler(details):
print(f"Attempt {details['tries']}, elapsed {details['elapsed']:.2f}s")
@backon.on_exception(
backon.expo,
ValueError,
max_tries=3,
on_attempt=handler,
on_backoff=handler,
on_success=handler,
on_giveup=handler,
)
def f():
...
Available keys in details:
| Key | Available in |
|---|---|
target |
All |
args, kwargs |
All |
tries |
All |
elapsed |
All |
value |
on_success, on_backoff, on_giveup |
exception |
on_backoff, on_giveup |
wait |
on_backoff, before_sleep |
Global Toggle
Useful in tests to disable retry logic globally:
backon.disable() # skip retry, call function directly
backon.enable() # re-enable retry
Per-instance toggle via Retrying.enabled:
r = backon.Retrying(backon.expo, exception=ValueError, max_tries=3)
r.enabled = False
result = r.call(fn) # no retry
Async Support
All three APIs work with async functions transparently:
@backon.on_exception(backon.expo, ValueError, max_tries=3)
async def fetch():
return await api.call()
result = await backon.retry(fetch, backon.expo, exception=ValueError, max_tries=3)
async with backon.Retrying(backon.expo, exception=ValueError, max_tries=3) as r:
result = await r.async_call(fetch)
Custom Sleep
Replace the default sleep for testing or special environments:
@backon.on_exception(
backon.expo,
ValueError,
max_tries=3,
sleep=lambda s: print(f"waiting {s}s"),
)
def f():
...
# With asyncio.Event for testing
import asyncio
event = asyncio.Event()
@backon.on_exception(
backon.expo,
ValueError,
max_tries=3,
sleep=backon.sleep_using_event(event),
)
async def f():
...
Advanced Features
Circuit Breaker
Circuit breaker with three states: CLOSED (normal), OPEN (failing), HALF_OPEN (testing recovery).
from backon._circuit_breaker import CircuitBreaker, BreakerRetrying, CircuitOpenError
breaker = BreakerRetrying(
backon.expo,
max_tries=3,
breaker=CircuitBreaker(
failure_threshold=5,
recovery_timeout=60.0,
half_open_max_calls=1,
),
)
try:
result = breaker.call(fetch)
except CircuitOpenError:
print("Circuit is open, skipping request")
CircuitBreaker parameter |
Default | Description |
|---|---|---|
failure_threshold |
5 | Consecutive failures before opening the circuit |
recovery_timeout |
60.0 | Seconds before transitioning from OPEN to HALF_OPEN |
half_open_max_calls |
1 | Allowed calls in HALF_OPEN state before fully closing |
name |
"" | Identifier for the breaker |
Hedging
Run multiple retry attempts concurrently and return the first success.
from backon._hedging import hedge, HedgingRetrying
# Functional
result = hedge(fetch, backon.expo, max_hedge=3)
# Decorator
@backon.on_hedge(backon.expo, max_hedge=3)
def fetch():
...
# Context manager
with HedgingRetrying(backon.expo, max_hedge=3) as h:
result = h.call(fetch)
| Parameter | Default | Description |
|---|---|---|
max_hedge |
3 | Number of concurrent hedged requests |
timeout |
None | Maximum time to wait for any hedge |
on_hedge |
None | Callback when a hedge request is sent |
Prometheus / OpenTelemetry Metrics
Optional Prometheus and OpenTelemetry metrics. Requires prometheus_client or opentelemetry-api to be installed.
from backon._instrumentation import PrometheusMetrics, OTelMetrics, set_metrics_collector
# Prometheus
set_metrics_collector(PrometheusMetrics())
# OpenTelemetry
set_metrics_collector(OTelMetrics(meter_name="myapp.backon"))
Metrics collected:
backon_retry_attempts_total(attempts, labeled by target and exception type)backon_retry_success_total(successes)backon_retry_failure_total(failures)backon_circuit_breaker_open_total/backon_circuit_breaker_close_totalbackon_hedge_requests_totalbackon.retry.attempt_duration(histogram, OTel only)
Testing Module
from backon._testing import (
disable_retries,
enable_retries,
test_config,
limit_retries,
remove_backoff,
assert_retried,
assert_not_retried,
)
# Context manager that skips retry for a block
with disable_retries():
result = fetch()
# Limit max retries in tests
with limit_retries(2):
fetch()
# Remove backoff delay entirely
with remove_backoff():
fetch()
# Assert the function was retried N times
assert_retried(fetch, expected_tries=3)
Trio Support
Retry with the trio async framework.
Comments
No comments yet. Start the discussion.