Wait & Retry

Wait & Retry#

By default, Throttled returns RateLimitResult immediately.

To enable wait-and-retry behavior, you can use the timeout parameter.

Throttled will wait according to the RateLimitState.retry_after and retry automatically.

In the Function Call mode will return the last retried RateLimitResult:

import time

from throttled import Throttled, utils


def main() -> None:
    # Allow 1 burst request, producing 1 token per second.
    throttle = Throttled(key="key", quota="1/s burst 1")

    # Consume burst request quota.
    assert not throttle.limit().limited

    timer = utils.Timer(
        clock=time.time,
        callback=lambda elapsed, start, end: print(f"elapsed: {elapsed:.2f} seconds"),
    )
    with timer:
        # Enabled wait-retry, which will wait for the next available token
        # if the limit is reached.
        # > elapsed: 1.00 seconds
        assert not throttle.limit(timeout=1).limited

    with timer:
        # If the timeout is exceeded, it will return the last RateLimitResult.
        # timeout < ``RateLimitResult.retry_after``, return immediately.
        # > elapsed: 0 seconds
        assert throttle.limit(timeout=0.5).limited


if __name__ == "__main__":
    main()
import asyncio
import time

from throttled.asyncio import Throttled, utils


async def main() -> None:
    # Allow 1 burst request, producing 1 token per second.
    throttle = Throttled(key="key", quota="1/s burst 1")

    # Consume burst request quota.
    assert not (await throttle.limit()).limited

    timer = utils.Timer(
        clock=time.time,
        callback=lambda elapsed, start, end: print(f"elapsed: {elapsed:.2f} seconds"),
    )
    async with timer:
        # Enabled wait-retry, which will wait for the next available token
        # if the limit is reached.
        # > elapsed: 1.00 seconds
        assert not (await throttle.limit(timeout=1)).limited

    with timer:
        # If the timeout is exceeded, it will return the last RateLimitResult.
        # timeout < ``RateLimitResult.retry_after``, return immediately.
        # > elapsed: 0 seconds
        assert (await throttle.limit(timeout=0.5)).limited


if __name__ == "__main__":
    asyncio.run(main())

In the Decorator and Context Manager modes, LimitedError will be raised if the request is not allowed after the timeout:

import time

from throttled import RateLimiterType, Throttled


@Throttled(
    key="ping",
    using=RateLimiterType.GCRA.value,
    quota="2/s burst 2",
    # ⏳ Set timeout to 0.5 second, which allows waiting for retry,
    # and returns the last RateLimitResult if the wait exceeds 0.5 second.
    timeout=0.5,
)
def ping() -> str:
    return "pong"


def main() -> None:
    # Make 5 sequential requests.
    start_time = time.time()
    for i in range(5):
        ping()  # type: ignore[call-arg]
        print(f"Request {i + 1} completed at {time.time() - start_time:.2f}s")

    total_time = time.time() - start_time
    print(f"\nTotal time for 5 requests at 2/sec: {total_time:.2f}s")


if __name__ == "__main__":
    main()
import asyncio
import time

from throttled.asyncio import RateLimiterType, Throttled


@Throttled(
    key="ping",
    using=RateLimiterType.GCRA.value,
    quota="2/s burst 2",
    # ⏳ Set timeout to 0.5 second, which allows waiting for retry,
    # and returns the last RateLimitResult if the wait exceeds 0.5 second.
    timeout=0.5,
)
async def ping() -> str:
    return "pong"


async def main() -> None:
    # Make 5 sequential requests.
    start_time = time.time()
    for i in range(5):
        await ping()
        print(f"Request {i + 1} completed at {time.time() - start_time:.2f}s")

    print(f"\nTotal time for 5 requests at 2/sec: {time.time() - start_time:.2f}s")


if __name__ == "__main__":
    asyncio.run(main())

In the above example, per_sec(2, burst=2) means allows 2 requests per second, and allows 2 burst requests (Bucket’s capacity). In other words, Throttled will consume the burst after 2 requests. If timeout>=0.5 is set, the above example will complete all requests in 1.5 seconds (the burst is consumed immediately, and the 3 requests will be filled in the subsequent 1.5s):

------------- Burst---------------------
Request 1 completed at 0.00s
Request 2 completed at 0.00s
----------------------------------------
-- Refill: 0.5 tokens per second -------
Request 3 completed at 0.50s
Request 4 completed at 1.00s
Request 5 completed at 1.50s
-----------------------------------------
Total time for 5 requests at 2/sec: 1.50s

Wait & Retry is most effective for smoothing out request rates, and you can feel its effect through the following example:

from throttled import RateLimiterType, Throttled, utils

throttle = Throttled(
    using=RateLimiterType.GCRA.value,
    quota="100/s burst 100",
    # ⏳ Set timeout to 1 second, which allows waiting for retry,
    # and returns the last RateLimitResult if the wait exceeds 1 second.
    timeout=1,
)


def call_api() -> bool:
    # ⬆️⏳ Function call with timeout will override the global timeout.
    result = throttle.limit("/ping", cost=1, timeout=1)
    return result.limited


if __name__ == "__main__":
    # 👇 The actual QPS is close to the preset quota (100 req/s):
    # ✅ Total: 1000, 🕒 Latency: 35.8103 ms/op, 🚀 Throughput: 111 req/s (--)
    # ❌ Denied: 8 requests
    benchmark: utils.Benchmark = utils.Benchmark()
    denied_num: int = sum(benchmark.concurrent(call_api, 1_000, workers=4))
    print(f"❌ Denied: {denied_num} requests")
import asyncio

from throttled.asyncio import RateLimiterType, Throttled, utils

throttle = Throttled(
    using=RateLimiterType.GCRA.value,
    quota="100/s burst 100",
    # ⏳ Set timeout to 1 second, which allows waiting for retry,
    # and returns the last RateLimitResult if the wait exceeds 1 second.
    timeout=1,
)


async def call_api() -> bool:
    # ⬆️⏳ Function call with timeout will override the global timeout.
    result = await throttle.limit("/ping", cost=1, timeout=1)
    return result.limited


async def main() -> None:
    benchmark: utils.Benchmark = utils.Benchmark()
    denied_num: int = sum(await benchmark.async_concurrent(call_api, 1_000, workers=4))
    print(f"❌ Denied: {denied_num} requests")


if __name__ == "__main__":
    # 👇 The actual QPS is close to the preset quota (100 req/s):
    # ✅ Total: 1000, 🕒 Latency: 35.8103 ms/op, 🚀 Throughput: 111 req/s (--)
    # ❌ Denied: 8 requests
    asyncio.run(main())