Wait & Retry#
By default, Throttled returns RateLimitResult immediately.
To enable wait-and-retry behavior, you can use the timeout parameter.
Throttled will wait according to the
RateLimitState.retry_after and retry automatically.
In the Function Call mode will return
the last retried RateLimitResult:
import time
from throttled import Throttled, utils
def main() -> None:
# Allow 1 burst request, producing 1 token per second.
throttle = Throttled(key="key", quota="1/s burst 1")
# Consume burst request quota.
assert not throttle.limit().limited
timer = utils.Timer(
clock=time.time,
callback=lambda elapsed, start, end: print(f"elapsed: {elapsed:.2f} seconds"),
)
with timer:
# Enabled wait-retry, which will wait for the next available token
# if the limit is reached.
# > elapsed: 1.00 seconds
assert not throttle.limit(timeout=1).limited
with timer:
# If the timeout is exceeded, it will return the last RateLimitResult.
# timeout < ``RateLimitResult.retry_after``, return immediately.
# > elapsed: 0 seconds
assert throttle.limit(timeout=0.5).limited
if __name__ == "__main__":
main()
import asyncio
import time
from throttled.asyncio import Throttled, utils
async def main() -> None:
# Allow 1 burst request, producing 1 token per second.
throttle = Throttled(key="key", quota="1/s burst 1")
# Consume burst request quota.
assert not (await throttle.limit()).limited
timer = utils.Timer(
clock=time.time,
callback=lambda elapsed, start, end: print(f"elapsed: {elapsed:.2f} seconds"),
)
async with timer:
# Enabled wait-retry, which will wait for the next available token
# if the limit is reached.
# > elapsed: 1.00 seconds
assert not (await throttle.limit(timeout=1)).limited
with timer:
# If the timeout is exceeded, it will return the last RateLimitResult.
# timeout < ``RateLimitResult.retry_after``, return immediately.
# > elapsed: 0 seconds
assert (await throttle.limit(timeout=0.5)).limited
if __name__ == "__main__":
asyncio.run(main())
In the Decorator and Context Manager modes,
LimitedError will be raised if the request is not allowed after the timeout:
import time
from throttled import RateLimiterType, Throttled
@Throttled(
key="ping",
using=RateLimiterType.GCRA.value,
quota="2/s burst 2",
# ⏳ Set timeout to 0.5 second, which allows waiting for retry,
# and returns the last RateLimitResult if the wait exceeds 0.5 second.
timeout=0.5,
)
def ping() -> str:
return "pong"
def main() -> None:
# Make 5 sequential requests.
start_time = time.time()
for i in range(5):
ping() # type: ignore[call-arg]
print(f"Request {i + 1} completed at {time.time() - start_time:.2f}s")
total_time = time.time() - start_time
print(f"\nTotal time for 5 requests at 2/sec: {total_time:.2f}s")
if __name__ == "__main__":
main()
import asyncio
import time
from throttled.asyncio import RateLimiterType, Throttled
@Throttled(
key="ping",
using=RateLimiterType.GCRA.value,
quota="2/s burst 2",
# ⏳ Set timeout to 0.5 second, which allows waiting for retry,
# and returns the last RateLimitResult if the wait exceeds 0.5 second.
timeout=0.5,
)
async def ping() -> str:
return "pong"
async def main() -> None:
# Make 5 sequential requests.
start_time = time.time()
for i in range(5):
await ping()
print(f"Request {i + 1} completed at {time.time() - start_time:.2f}s")
print(f"\nTotal time for 5 requests at 2/sec: {time.time() - start_time:.2f}s")
if __name__ == "__main__":
asyncio.run(main())
In the above example, per_sec(2, burst=2) means allows 2 requests per second, and allows
2 burst requests (Bucket’s capacity). In other words, Throttled will consume the burst after 2 requests.
If timeout>=0.5 is set, the above example will complete all requests in 1.5 seconds (the burst is consumed
immediately, and the 3 requests will be filled in the subsequent 1.5s):
------------- Burst---------------------
Request 1 completed at 0.00s
Request 2 completed at 0.00s
----------------------------------------
-- Refill: 0.5 tokens per second -------
Request 3 completed at 0.50s
Request 4 completed at 1.00s
Request 5 completed at 1.50s
-----------------------------------------
Total time for 5 requests at 2/sec: 1.50s
Wait & Retry is most effective for smoothing out request rates, and you can feel its effect
through the following example:
from throttled import RateLimiterType, Throttled, utils
throttle = Throttled(
using=RateLimiterType.GCRA.value,
quota="100/s burst 100",
# ⏳ Set timeout to 1 second, which allows waiting for retry,
# and returns the last RateLimitResult if the wait exceeds 1 second.
timeout=1,
)
def call_api() -> bool:
# ⬆️⏳ Function call with timeout will override the global timeout.
result = throttle.limit("/ping", cost=1, timeout=1)
return result.limited
if __name__ == "__main__":
# 👇 The actual QPS is close to the preset quota (100 req/s):
# ✅ Total: 1000, 🕒 Latency: 35.8103 ms/op, 🚀 Throughput: 111 req/s (--)
# ❌ Denied: 8 requests
benchmark: utils.Benchmark = utils.Benchmark()
denied_num: int = sum(benchmark.concurrent(call_api, 1_000, workers=4))
print(f"❌ Denied: {denied_num} requests")
import asyncio
from throttled.asyncio import RateLimiterType, Throttled, utils
throttle = Throttled(
using=RateLimiterType.GCRA.value,
quota="100/s burst 100",
# ⏳ Set timeout to 1 second, which allows waiting for retry,
# and returns the last RateLimitResult if the wait exceeds 1 second.
timeout=1,
)
async def call_api() -> bool:
# ⬆️⏳ Function call with timeout will override the global timeout.
result = await throttle.limit("/ping", cost=1, timeout=1)
return result.limited
async def main() -> None:
benchmark: utils.Benchmark = utils.Benchmark()
denied_num: int = sum(await benchmark.async_concurrent(call_api, 1_000, workers=4))
print(f"❌ Denied: {denied_num} requests")
if __name__ == "__main__":
# 👇 The actual QPS is close to the preset quota (100 req/s):
# ✅ Total: 1000, 🕒 Latency: 35.8103 ms/op, 🚀 Throughput: 111 req/s (--)
# ❌ Denied: 8 requests
asyncio.run(main())