Hooks#
The Hook system provides a flexible middleware pattern for extending rate limiting behavior. Hooks can be used for:
Observability: Monitor rate limiting events, record metrics
Timing: Measure duration of rate limit checks
Exception handling: Gracefully handle errors in the rate limiting flow
Custom logic: Add pre/post processing around rate limit checks
1) Basic Usage#
from collections.abc import Callable
from throttled import Hook, HookContext, RateLimitResult, Throttled
class LoggingHook(Hook):
def on_limit( # noqa: PLR6301
self,
call_next: Callable[[], RateLimitResult],
context: HookContext,
) -> RateLimitResult:
# Before rate limit check
print(f"Checking rate limit for {context.key}")
# Execute rate limit check
result = call_next()
# After rate limit check
status = "denied" if result.limited else "allowed"
print(f"[{context.key}] {status} - remaining: {result.state.remaining}")
return result
throttle = Throttled(key="/api/users", quota="10/s", hooks=[LoggingHook()])
def main() -> None:
result = throttle.limit()
# Output:
# Checking rate limit for /api/users
# [/api/users] allowed - remaining: 9
print(f"limited={result.limited}")
if __name__ == "__main__":
main()
import asyncio
from collections.abc import Awaitable, Callable
from throttled.asyncio import Hook, HookContext, RateLimitResult, Throttled
class LoggingHook(Hook):
async def on_limit( # noqa: PLR6301
self,
call_next: Callable[[], Awaitable[RateLimitResult]],
context: HookContext,
) -> RateLimitResult:
# Before rate limit check
print(f"Checking rate limit for {context.key}")
# Execute rate limit check
result = await call_next()
# After rate limit check
status = "denied" if result.limited else "allowed"
print(f"[{context.key}] {status} - remaining: {result.state.remaining}")
return result
throttle = Throttled(key="/api/users", quota="10/s", hooks=[LoggingHook()])
async def main() -> None:
result = await throttle.limit()
# Output:
# Checking rate limit for /api/users
# [/api/users] allowed - remaining: 9
print(f"limited={result.limited}")
if __name__ == "__main__":
asyncio.run(main())
2) Middleware Pattern#
Hooks follow the middleware pattern (Chain of Responsibility). When you register multiple hooks, they are executed in order, wrapping the rate limit operation:
hooks = [A, B]
Execution order:
A.on_limit (before) → B.on_limit (before) → rate_limit → B.on_limit (after) → A.on_limit (after)
This allows each hook to:
Execute logic before the rate limit check
Call
call_next()to continue the chainExecute logic after the rate limit check
Inspect or modify the result
3) HookContext#
The HookContext is an immutable dataclass containing information about the rate limit request:
Attribute |
Type |
Description |
|---|---|---|
|
|
The identifier being rate-limited (e.g., user_id, IP address) |
|
|
The cost of the current request |
|
|
The algorithm used (e.g., “token_bucket”, “fixed_window”) |
|
|
The storage backend type (e.g., “memory”, “redis”) |
Note
HookContext does not include the result. The result is obtained by calling call_next().
4) Hook Type Validation#
Throttled validates hook types at initialization. Sync Throttled only accepts Hook instances,
and async Throttled only accepts AsyncHook instances. Passing an invalid type raises TypeError:
from throttled import Throttled, Hook, per_sec
from throttled.asyncio import Throttled as AsyncThrottled
from throttled.asyncio.hooks import Hook as AsyncHook
# ✅ Correct: sync Hook with sync Throttled
Throttled(key="/api", quota=per_sec(10), hooks=[MySyncHook()])
# ✅ Correct: async Hook with async Throttled
AsyncThrottled(key="/api", quota=per_sec(10), hooks=[MyAsyncHook()])
# ❌ TypeError: async Hook with sync Throttled
Throttled(key="/api", quota=per_sec(10), hooks=[MyAsyncHook()])
# ❌ TypeError: non-hook object
Throttled(key="/api", quota=per_sec(10), hooks=["not a hook"])
Note
Hooks are stored as a tuple internally for immutability after construction.
5) Creating Custom Hooks#
To create a custom hook, inherit from Hook and implement the on_limit method:
import time
from collections.abc import Callable
from throttled import Hook, HookContext, RateLimitResult, Throttled
class TimingHook(Hook):
def on_limit( # noqa: PLR6301
self,
call_next: Callable[[], RateLimitResult],
context: HookContext,
) -> RateLimitResult:
start = time.perf_counter()
result = call_next()
duration = time.perf_counter() - start
print(f"Rate limit check took {duration:.4f}s")
return result
throttle = Throttled(key="/api/users", quota="10/s", hooks=[TimingHook()])
def main() -> None:
result = throttle.limit()
print(f"limited={result.limited}")
if __name__ == "__main__":
main()
import asyncio
import time
from collections.abc import Awaitable, Callable
from throttled.asyncio import Hook, HookContext, RateLimitResult, Throttled
class TimingHook(Hook):
async def on_limit( # noqa: PLR6301
self,
call_next: Callable[[], Awaitable[RateLimitResult]],
context: HookContext,
) -> RateLimitResult:
start = time.perf_counter()
result = await call_next()
duration = time.perf_counter() - start
print(f"Rate limit check took {duration:.4f}s")
return result
throttle = Throttled(key="/api/users", quota="10/s", hooks=[TimingHook()])
async def main() -> None:
result = await throttle.limit()
print(f"limited={result.limited}")
if __name__ == "__main__":
asyncio.run(main())
Best Practices#
Always call call_next(): The hook must call
call_next()to continue the chain and return its result.class MyHook(Hook): def on_limit(self, call_next, context): # Must call call_next() and return its result result = call_next() return result
Handle exceptions gracefully: If your hook raises an exception, it will be caught and the hook is skipped entirely - the chain continues by calling the next hook directly. The exception is logged via
logger.exception()using per-module loggers (throttled.hooksfor sync,throttled.asyncio.hooksfor async), so you can capture hook failures through standard Python logging configuration. To ensure your hook doesn’t get skipped, wrap risky operations in try/except:class SafeHook(Hook): def on_limit(self, call_next, context): # Pre-processing with error handling try: self.validate_request(context) except Exception as e: logging.warning(f"Pre-processing failed: {e}") # call_next() outside try/except - always executes result = call_next() # Post-processing with error handling try: self.send_metrics(result, context) except Exception as e: logging.warning(f"Post-processing failed: {e}") return result
Keep hooks fast: The sync
on_limitmethod runs synchronously. For slow operations (HTTP calls, database writes), use a queue or background task. Alternatively, usethrottled.asyncio.Hookfor native async support.class AsyncMetricsHook(Hook): def __init__(self, queue): self.queue = queue def on_limit(self, call_next, context): result = call_next() # Non-blocking: add to queue for async processing self.queue.put_nowait({ "key": context.key, "limited": result.limited, "timestamp": time.time(), }) return result
Use multiple hooks: You can register multiple hooks for different purposes.
throttle = Throttled( key="/api", quota=per_sec(100), hooks=[ TimingHook(), LoggingHook(), MetricsHook(statsd_client), ], )
6) Built-in Hooks#
throttled-py provides the following built-in hooks:
Hook |
Description |
|---|---|
OpenTelemetry metrics integration for monitoring rate limiting events. |
|
Async version of |