Store Backends

Store Backends#

1) In-Memory#

MemoryStore is essentially a memory-based LRU Cache with expiration time, it is thread-safe and can be used for rate limiting in a single process.

By default, Throttled will initialize a global MemoryStore instance with maximum capacity of 1024, so you don’t usually need to create it manually.

Also note that throttled.store.MemoryStore and throttled.asyncio.store.MemoryStore are implemented based on threading.RLock and asyncio.Lock respectively, so the global instance is also independent for synchronous and asynchronous usage.

Different instances mean different storage spaces, if you want to limit the same key in different places in your program, make sure that Throttled receives the same MemoryStore instance and uses the same Quota configuration.

The following example uses MemoryStore as the storage backend and throttles the same Key on ping and pong:

from throttled import Throttled, rate_limiter, store

# 🌟 Use MemoryStore as the storage backend.
mem_store = store.MemoryStore()


@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
def ping() -> str:
    return "ping"


@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
def pong() -> str:
    return "pong"


def demo():
    # >> ping
    ping()
    # >> throttled.exceptions.LimitedError:
    # Rate limit exceeded: remaining=0, reset_after=60, retry_after=60.
    pong()


if __name__ == "__main__":
    demo()
import asyncio

from throttled.asyncio import Throttled, rate_limiter, store

# 🌟 Use MemoryStore as the storage backend.
mem_store = store.MemoryStore()


@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
async def ping() -> str:
    return "ping"


@Throttled(key="ping-pong", quota=rate_limiter.per_min(1), store=mem_store)
async def pong() -> str:
    return "pong"


async def demo():
    # >> ping
    await ping()
    # >> throttled.exceptions.LimitedError:
    # Rate limit exceeded: remaining=0, reset_after=60, retry_after=60.
    await pong()


if __name__ == "__main__":
    asyncio.run(demo())

2) Redis#

RedisStore is implemented based on redis-py, you can use it for rate limiting in a distributed environment.

It supports the following arguments:

The following example uses RedisStore as the storage backend:

from throttled import RateLimiterType, Throttled, rate_limiter, store


@Throttled(
    key="/api/products",
    using=RateLimiterType.TOKEN_BUCKET.value,
    quota=rate_limiter.per_min(1),
    # 🌟 use RedisStore as storage
    store=store.RedisStore(server="redis://127.0.0.1:6379/0", options={"PASSWORD": ""}),
)
def products() -> list:
    return [{"name": "iPhone"}, {"name": "MacBook"}]


def demo():
    products()
    # >> throttled.exceptions.LimitedError:
    # Rate limit exceeded: remaining=0, reset_after=60, retry_after=60.
    products()


if __name__ == "__main__":
    demo()
import asyncio

from throttled.asyncio import RateLimiterType, Throttled, rate_limiter, store


@Throttled(
    key="/api/products",
    using=RateLimiterType.TOKEN_BUCKET.value,
    quota=rate_limiter.per_min(1),
    # 🌟 use RedisStore as storage
    store=store.RedisStore(server="redis://127.0.0.1:6379/0", options={"PASSWORD": ""}),
)
async def products() -> list:
    return [{"name": "iPhone"}, {"name": "MacBook"}]


async def demo():
    await products()
    # >> throttled.exceptions.LimitedError:
    # Rate limit exceeded: remaining=0, reset_after=60, retry_after=60.
    await products()


if __name__ == "__main__":
    asyncio.run(demo())