Store Backends#
1) In-Memory#
MemoryStore is essentially a memory-based
LRU Cache with expiration time, it is thread-safe and
can be used for rate limiting in a single process.
By default, Throttled will initialize a global
MemoryStore instance with maximum capacity of 1024,
so you donβt usually need to create it manually.
Also note that throttled.store.MemoryStore and throttled.asyncio.store.MemoryStore are implemented based on
threading.RLock and asyncio.Lock respectively, so the global instance is also independent
for synchronous and asynchronous usage.
Different instances mean different storage spaces, if you want to limit the same key in different places
in your program, make sure that Throttled receives the same
MemoryStore instance and uses the same
Quota configuration.
The following example uses MemoryStore as the storage backend and
throttles the same Key on ping and pong:
from throttled import Throttled, rate_limiter, store
# π Use MemoryStore as the storage backend.
mem_store = store.MemoryStore()
@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
def ping() -> str:
return "ping"
@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
def pong() -> str:
return "pong"
def demo():
# >> ping
ping()
# >> throttled.exceptions.LimitedError:
# Rate limit exceeded: remaining=0, reset_after=60, retry_after=60.
pong()
if __name__ == "__main__":
demo()
import asyncio
from throttled.asyncio import Throttled, rate_limiter, store
# π Use MemoryStore as the storage backend.
mem_store = store.MemoryStore()
@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
async def ping() -> str:
return "ping"
@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
async def pong() -> str:
return "pong"
async def demo():
# >> ping
await ping()
# >> throttled.exceptions.LimitedError:
# Rate limit exceeded: remaining=0, reset_after=60, retry_after=60.
await pong()
if __name__ == "__main__":
asyncio.run(demo())
2) Redis#
RedisStore is implemented based on redis-py,
you can use it for rate limiting in a distributed environment.
It supports the following arguments:
server: Standard Redis URL.options: Redis connection configuration, supports all configuration items of redis-py, see RedisStore Options.
The following example uses RedisStore as the storage backend:
from throttled import RateLimiterType, Throttled, rate_limiter, store
@Throttled(
key="/api/products",
using=RateLimiterType.TOKEN_BUCKET.value,
quota=rate_limiter.per_min(1),
# π use RedisStore as storage
store=store.RedisStore(server="redis://127.0.0.1:6379/0", options={"PASSWORD": ""}),
)
def products() -> list:
return [{"name": "iPhone"}, {"name": "MacBook"}]
def demo():
products()
# >> throttled.exceptions.LimitedError:
# Rate limit exceeded: remaining=0, reset_after=60, retry_after=60.
products()
if __name__ == "__main__":
demo()
import asyncio
from throttled.asyncio import RateLimiterType, Throttled, rate_limiter, store
@Throttled(
key="/api/products",
using=RateLimiterType.TOKEN_BUCKET.value,
quota=rate_limiter.per_min(1),
# π use RedisStore as storage
store=store.RedisStore(server="redis://127.0.0.1:6379/0", options={"PASSWORD": ""}),
)
async def products() -> list:
return [{"name": "iPhone"}, {"name": "MacBook"}]
async def demo():
await products()
# >> throttled.exceptions.LimitedError:
# Rate limit exceeded: remaining=0, reset_after=60, retry_after=60.
await products()
if __name__ == "__main__":
asyncio.run(demo())