Python API

Storage backends

CheckpointStorage interface, LocalFileStorage, S3Storage, and the StorageFactory.

docs/api/storage.mdx·edit on github ↗·

Storage backends are how CheckpointManager persists records. yoink ships two — local files and S3 — and the interface is small enough to add your own (Redis, GCS, Azure Blob, etc.).

from yoink.storage import (
    CheckpointStorage,   # abstract base
    LocalFileStorage,
    S3Storage,
    StorageFactory,
)

CheckpointStorage interface

Every backend implements five async methods:

class CheckpointStorage(ABC):
    @abstractmethod
    async def write(self, data: str) -> None: ...
 
    @abstractmethod
    async def read(self) -> AsyncIterator[str]: ...
 
    @abstractmethod
    async def exists(self) -> bool: ...
 
    @abstractmethod
    async def flush(self) -> None: ...
 
    @abstractmethod
    async def close(self) -> None: ...
NameTypeDefaultDescription
write(data)asyncAppend data (typically a JSONL line with newline) to the checkpoint.
read()async iteratorYield checkpoint contents line-by-line.
exists()asyncReturn True if the checkpoint exists.
flush()asyncEnsure buffered writes are persisted.
close()asyncFlush and release resources (file handles, S3 clients).

LocalFileStorage

Async append to a local file via aiofiles.

LocalFileStorage(path: str)
storage = LocalFileStorage("./crawl.jsonl")
  • Opens the file in append mode on first write().
  • flush() calls the underlying flush() on the file handle (OS will still buffer to disk; pair with fsync if you need durability guarantees beyond the crawl).
  • close() closes the file handle.

S3Storage

Buffered S3 backend using aioboto3. Requires the [s3] extra.

S3Storage(uri: str)  # s3://bucket/key
storage = S3Storage("s3://my-bucket/crawls/site-a.jsonl")

Behavior:

  • write() buffers in memory.
  • flush() downloads existing object (if any), appends the buffer, re-uploads via put_object. This is necessary because S3 objects don't support append.
  • read() does a single get_object and yields lines.
  • exists() does head_object.

Required IAM permissions

{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Action": ["s3:GetObject", "s3:PutObject", "s3:HeadObject"],
    "Resource": "arn:aws:s3:::your-bucket-name/*"
  }]
}

Credentials

aioboto3 uses the standard boto3 credential chain. Locally, run aws configure. On Lambda / EC2 / ECS, attach an IAM role to the runtime and S3Storage will pick it up automatically.

StorageFactory

Picks a backend based on URI scheme. This is what CheckpointManager.from_uri() uses internally.

StorageFactory.from_uri("./checkpoint.jsonl")
# → LocalFileStorage
 
StorageFactory.from_uri("/abs/path.jsonl")
# → LocalFileStorage
 
StorageFactory.from_uri("s3://bucket/key.jsonl")
# → S3Storage

Implementing a custom backend

Implementing the interface is roughly 80 lines. Here's a sketch for Redis:

import redis.asyncio as redis
from yoink.storage import CheckpointStorage
 
class RedisStreamStorage(CheckpointStorage):
    def __init__(self, url: str, key: str):
        self.client = redis.from_url(url)
        self.key = key
 
    async def write(self, data: str) -> None:
        await self.client.rpush(self.key, data)
 
    async def read(self):
        for raw in await self.client.lrange(self.key, 0, -1):
            yield raw.decode("utf-8")
 
    async def exists(self) -> bool:
        return bool(await self.client.exists(self.key))
 
    async def flush(self) -> None:
        # Redis is auto-flushed
        pass
 
    async def close(self) -> None:
        await self.client.aclose()

Then plug it in:

from yoink import CheckpointManager
 
storage = RedisStreamStorage("redis://localhost", "yoink:crawl-1")
checkpoint = CheckpointManager(storage=storage)
 
crawler = Crawler(config=CrawlConfig(), checkpoint_manager=checkpoint)

See also