Hooks#

The Hook system provides a flexible middleware pattern for extending rate limiting behavior. Hooks can be used for:

  • Observability: Monitor rate limiting events, record metrics

  • Timing: Measure duration of rate limit checks

  • Exception handling: Gracefully handle errors in the rate limiting flow

  • Custom logic: Add pre/post processing around rate limit checks

1) Basic Usage#

from collections.abc import Callable

from throttled import Hook, HookContext, RateLimitResult, Throttled


class LoggingHook(Hook):
    def on_limit(  # noqa: PLR6301
        self,
        call_next: Callable[[], RateLimitResult],
        context: HookContext,
    ) -> RateLimitResult:
        # Before rate limit check
        print(f"Checking rate limit for {context.key}")

        # Execute rate limit check
        result = call_next()

        # After rate limit check
        status = "denied" if result.limited else "allowed"
        print(f"[{context.key}] {status} - remaining: {result.state.remaining}")

        return result


throttle = Throttled(key="/api/users", quota="10/s", hooks=[LoggingHook()])


def main() -> None:
    result = throttle.limit()
    # Output:
    # Checking rate limit for /api/users
    # [/api/users] allowed - remaining: 9
    print(f"limited={result.limited}")


if __name__ == "__main__":
    main()
import asyncio
from collections.abc import Awaitable, Callable

from throttled.asyncio import Hook, HookContext, RateLimitResult, Throttled


class LoggingHook(Hook):
    async def on_limit(  # noqa: PLR6301
        self,
        call_next: Callable[[], Awaitable[RateLimitResult]],
        context: HookContext,
    ) -> RateLimitResult:
        # Before rate limit check
        print(f"Checking rate limit for {context.key}")

        # Execute rate limit check
        result = await call_next()

        # After rate limit check
        status = "denied" if result.limited else "allowed"
        print(f"[{context.key}] {status} - remaining: {result.state.remaining}")

        return result


throttle = Throttled(key="/api/users", quota="10/s", hooks=[LoggingHook()])


async def main() -> None:
    result = await throttle.limit()
    # Output:
    # Checking rate limit for /api/users
    # [/api/users] allowed - remaining: 9
    print(f"limited={result.limited}")


if __name__ == "__main__":
    asyncio.run(main())

2) Middleware Pattern#

Hooks follow the middleware pattern (Chain of Responsibility). When you register multiple hooks, they are executed in order, wrapping the rate limit operation:

hooks = [A, B]

Execution order:
A.on_limit (before) → B.on_limit (before) → rate_limit → B.on_limit (after) → A.on_limit (after)

This allows each hook to:

  1. Execute logic before the rate limit check

  2. Call call_next() to continue the chain

  3. Execute logic after the rate limit check

  4. Inspect or modify the result

3) HookContext#

The HookContext is an immutable dataclass containing information about the rate limit request:

Attribute

Type

Description

key

str

The identifier being rate-limited (e.g., user_id, IP address)

cost

int

The cost of the current request

algorithm

str

The algorithm used (e.g., “token_bucket”, “fixed_window”)

store_type

str

The storage backend type (e.g., “memory”, “redis”)

Note

HookContext does not include the result. The result is obtained by calling call_next().

4) Hook Type Validation#

Throttled validates hook types at initialization. Sync Throttled only accepts Hook instances, and async Throttled only accepts AsyncHook instances. Passing an invalid type raises TypeError:

from throttled import Throttled, Hook, per_sec
from throttled.asyncio import Throttled as AsyncThrottled
from throttled.asyncio.hooks import Hook as AsyncHook

# ✅ Correct: sync Hook with sync Throttled
Throttled(key="/api", quota=per_sec(10), hooks=[MySyncHook()])

# ✅ Correct: async Hook with async Throttled
AsyncThrottled(key="/api", quota=per_sec(10), hooks=[MyAsyncHook()])

# ❌ TypeError: async Hook with sync Throttled
Throttled(key="/api", quota=per_sec(10), hooks=[MyAsyncHook()])

# ❌ TypeError: non-hook object
Throttled(key="/api", quota=per_sec(10), hooks=["not a hook"])

Note

Hooks are stored as a tuple internally for immutability after construction.

5) Creating Custom Hooks#

To create a custom hook, inherit from Hook and implement the on_limit method:

import time
from collections.abc import Callable

from throttled import Hook, HookContext, RateLimitResult, Throttled


class TimingHook(Hook):
    def on_limit(  # noqa: PLR6301
        self,
        call_next: Callable[[], RateLimitResult],
        context: HookContext,
    ) -> RateLimitResult:
        start = time.perf_counter()
        result = call_next()
        duration = time.perf_counter() - start

        print(f"Rate limit check took {duration:.4f}s")
        return result


throttle = Throttled(key="/api/users", quota="10/s", hooks=[TimingHook()])


def main() -> None:
    result = throttle.limit()
    print(f"limited={result.limited}")


if __name__ == "__main__":
    main()
import asyncio
import time
from collections.abc import Awaitable, Callable

from throttled.asyncio import Hook, HookContext, RateLimitResult, Throttled


class TimingHook(Hook):
    async def on_limit(  # noqa: PLR6301
        self,
        call_next: Callable[[], Awaitable[RateLimitResult]],
        context: HookContext,
    ) -> RateLimitResult:
        start = time.perf_counter()
        result = await call_next()
        duration = time.perf_counter() - start

        print(f"Rate limit check took {duration:.4f}s")
        return result


throttle = Throttled(key="/api/users", quota="10/s", hooks=[TimingHook()])


async def main() -> None:
    result = await throttle.limit()
    print(f"limited={result.limited}")


if __name__ == "__main__":
    asyncio.run(main())

Best Practices#

  1. Always call call_next(): The hook must call call_next() to continue the chain and return its result.

    class MyHook(Hook):
        def on_limit(self, call_next, context):
            # Must call call_next() and return its result
            result = call_next()
            return result
    
  2. Handle exceptions gracefully: If your hook raises an exception, it will be caught and the hook is skipped entirely - the chain continues by calling the next hook directly. The exception is logged via logger.exception() using per-module loggers (throttled.hooks for sync, throttled.asyncio.hooks for async), so you can capture hook failures through standard Python logging configuration. To ensure your hook doesn’t get skipped, wrap risky operations in try/except:

    class SafeHook(Hook):
        def on_limit(self, call_next, context):
            # Pre-processing with error handling
            try:
                self.validate_request(context)
            except Exception as e:
                logging.warning(f"Pre-processing failed: {e}")
    
            # call_next() outside try/except - always executes
            result = call_next()
    
            # Post-processing with error handling
            try:
                self.send_metrics(result, context)
            except Exception as e:
                logging.warning(f"Post-processing failed: {e}")
    
            return result
    
  3. Keep hooks fast: The sync on_limit method runs synchronously. For slow operations (HTTP calls, database writes), use a queue or background task. Alternatively, use throttled.asyncio.Hook for native async support.

    class AsyncMetricsHook(Hook):
        def __init__(self, queue):
            self.queue = queue
    
        def on_limit(self, call_next, context):
            result = call_next()
            # Non-blocking: add to queue for async processing
            self.queue.put_nowait({
                "key": context.key,
                "limited": result.limited,
                "timestamp": time.time(),
            })
            return result
    
  4. Use multiple hooks: You can register multiple hooks for different purposes.

    throttle = Throttled(
        key="/api",
        quota=per_sec(100),
        hooks=[
            TimingHook(),
            LoggingHook(),
            MetricsHook(statsd_client),
        ],
    )
    

6) Built-in Hooks#

throttled-py provides the following built-in hooks:

Hook

Description

OTelHook

OpenTelemetry metrics integration for monitoring rate limiting events.

OTelHook (async)

Async version of OTelHook for use with throttled.asyncio.Hook.