Storage backends
CheckpointStorage interface, LocalFileStorage, S3Storage, and the StorageFactory.
Storage backends are how CheckpointManager persists records. yoink ships two — local files and S3 — and the interface is small enough to add your own (Redis, GCS, Azure Blob, etc.).
from yoink.storage import (
CheckpointStorage, # abstract base
LocalFileStorage,
S3Storage,
StorageFactory,
)CheckpointStorage interface
Every backend implements five async methods:
class CheckpointStorage(ABC):
@abstractmethod
async def write(self, data: str) -> None: ...
@abstractmethod
async def read(self) -> AsyncIterator[str]: ...
@abstractmethod
async def exists(self) -> bool: ...
@abstractmethod
async def flush(self) -> None: ...
@abstractmethod
async def close(self) -> None: ...| Name | Type | Default | Description |
|---|---|---|---|
| write(data) | async | — | Append data (typically a JSONL line with newline) to the checkpoint. |
| read() | async iterator | — | Yield checkpoint contents line-by-line. |
| exists() | async | — | Return True if the checkpoint exists. |
| flush() | async | — | Ensure buffered writes are persisted. |
| close() | async | — | Flush and release resources (file handles, S3 clients). |
LocalFileStorage
Async append to a local file via aiofiles.
LocalFileStorage(path: str)storage = LocalFileStorage("./crawl.jsonl")- Opens the file in append mode on first
write(). flush()calls the underlyingflush()on the file handle (OS will still buffer to disk; pair withfsyncif you need durability guarantees beyond the crawl).close()closes the file handle.
S3Storage
Buffered S3 backend using aioboto3. Requires the [s3] extra.
S3Storage(uri: str) # s3://bucket/keystorage = S3Storage("s3://my-bucket/crawls/site-a.jsonl")Behavior:
write()buffers in memory.flush()downloads existing object (if any), appends the buffer, re-uploads viaput_object. This is necessary because S3 objects don't support append.read()does a singleget_objectand yields lines.exists()doeshead_object.
Required IAM permissions
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject", "s3:HeadObject"],
"Resource": "arn:aws:s3:::your-bucket-name/*"
}]
}Credentials
aioboto3 uses the standard boto3 credential chain. Locally, run aws configure. On Lambda / EC2 / ECS, attach an IAM role to the runtime and S3Storage will pick it up automatically.
StorageFactory
Picks a backend based on URI scheme. This is what CheckpointManager.from_uri() uses internally.
StorageFactory.from_uri("./checkpoint.jsonl")
# → LocalFileStorage
StorageFactory.from_uri("/abs/path.jsonl")
# → LocalFileStorage
StorageFactory.from_uri("s3://bucket/key.jsonl")
# → S3StorageImplementing a custom backend
Implementing the interface is roughly 80 lines. Here's a sketch for Redis:
import redis.asyncio as redis
from yoink.storage import CheckpointStorage
class RedisStreamStorage(CheckpointStorage):
def __init__(self, url: str, key: str):
self.client = redis.from_url(url)
self.key = key
async def write(self, data: str) -> None:
await self.client.rpush(self.key, data)
async def read(self):
for raw in await self.client.lrange(self.key, 0, -1):
yield raw.decode("utf-8")
async def exists(self) -> bool:
return bool(await self.client.exists(self.key))
async def flush(self) -> None:
# Redis is auto-flushed
pass
async def close(self) -> None:
await self.client.aclose()Then plug it in:
from yoink import CheckpointManager
storage = RedisStreamStorage("redis://localhost", "yoink:crawl-1")
checkpoint = CheckpointManager(storage=storage)
crawler = Crawler(config=CrawlConfig(), checkpoint_manager=checkpoint)