Building a Programmable Temporary Email Infrastructure from Scratch
How we built a real API-first disposable email service with FastAPI, aiosmtpd, WebSockets, and AWS SES — designed for automation, CI pipelines, and AI agents.
Building a Programmable Temporary Email Infrastructure from Scratch
FastAPI, SMTP, WebSockets & AWS
Temporary email services have existed since the early 2000s. Most of them are browser toys — you visit a page, get an address, maybe wait a few minutes, refresh, and copy a confirmation code. They work for human interaction. They fail completely for automation.
If you have ever tried to drive one of these services programmatically — from a test suite, a browser automation script, or an AI agent — you know the problems: no API, unreliable polling, addresses you can't control, expiration logic that races with your script, and no way to authenticate access. They are single-user, interactive services dressed up as infrastructure.
This article describes how we built a different thing: a programmable temporary email infrastructure with a real API, a real SMTP ingestion layer, deterministic expiration, WebSocket-based event propagation, and first-class support for AI agents via an MCP server. Everything is async, everything is observable, and the production environment is defined entirely in Terraform.
1. Why Disposable Email Tools Fail for Automation
The core problem is architectural. Consumer disposable email services are built for a human sitting at a browser. The UX model is: visit page → get address → wait → read. The automation model needs something fundamentally different.
What automation actually requires:
- An API to create mailboxes on demand, before the test starts
- Deterministic addresses that can be handed to the system under test before the email is sent
- Reliable event notification (not polling) when mail arrives
- Authenticated access so multiple test workers don't collide
- Controlled TTL so addresses expire on schedule, not randomly
- The ability to integrate into CI pipelines, agent loops, and headless processes
None of these are provided by consumer disposable email services because they were never designed for machines.
2. Architectural Overview
The system is composed of four distinct runtime concerns:
┌─────────────────────────────────────────────────────────────────┐
│ Email Ingestion │
│ │
│ [Dev] aiosmtpd (port 25) → MailHandler → core/delivery.py │
│ [Prod] SES → SNS → POST /api/v1/ses/inbound → delivery.py │
└────────────────────────────┬────────────────────────────────────┘
│
┌────────▼────────┐
│ core/delivery │ (shared logic)
│ ┌───────────┐ │
│ │ Mailbox │ │ ← PostgreSQL
│ │ Plan/Quota│ │
│ │ Message │ │
│ └─────┬─────┘ │
│ │ pub │
│ ┌─────▼─────┐ │
│ │ Redis │ │ ← pub/sub channel
│ └─────┬─────┘ │
└────────┼────────┘
│ subscribe
┌────────▼────────┐
│ FastAPI API │ ← REST + WebSocket
│ /api/v1/* │
│ /ws/inbox/{a} │
│ /api/v1/ses/* │
└────────┬────────┘
│
┌────────▼────────┐
│ MCP Server │ ← AI agent integration
│ stdio/tools │
└─────────────────┘
The email delivery path is completely decoupled from the API layer. Both the SMTP handler (aiosmtpd) and the SES webhook call the same core/delivery.py module. The API layer handles REST and WebSocket clients. Redis is the message bus between ingestion and real-time consumers.
Technology stack:
- Python 3.12, FastAPI, uvicorn (1 worker)
- SQLAlchemy 2.0 async + asyncpg + PostgreSQL 15
- redis-py async + hiredis
- aiosmtpd (dev SMTP receiver)
- AWS SES + SNS (production email receiver)
- Terraform for all AWS infrastructure
- Angular 17 frontend
3. SMTP Ingestion Layer
Development: aiosmtpd
For local development, email is received directly over SMTP using aiosmtpd, a pure-Python async SMTP server. The handler is intentionally minimal:
# smtp/handler.py
class MailHandler:
async def handle_DATA(self, server, session, envelope) -> str:
raw: bytes = envelope.content
for rcpt in envelope.rcpt_tos:
address = rcpt.lower().strip()
if not address.endswith(f"@{settings.domain}"):
logger.debug("Dominio no reconocido: %s — ignorado", address)
continue
await deliver_raw_email(raw, address)
return "250 Message accepted"
The handler always returns 250 Message accepted regardless of outcome. This is intentional. Returning SMTP error codes (4xx, 5xx) would cause senders to retry, queue, or bounce — none of which are desirable for disposable infrastructure. Rejection is silent and local.
The SMTP server itself is started as a separate process in smtp/smtp_main.py:
async def main() -> None:
handler = MailHandler()
controller = Controller(
handler,
hostname=settings.smtp_host,
port=settings.smtp_port,
)
controller.start()
# Block until SIGTERM/SIGINT
stop_event = asyncio.Event()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, stop_event.set)
await stop_event.wait()
controller.stop()
Production: SES → SNS → Webhook
In production, opening port 25 on an EC2 instance is impractical — AWS blocks outbound port 25 on new accounts, ISPs block inbound 25, and running your own MTA requires managing deliverability, blocklists, and reputation. Instead, we use AWS SES Email Receiving.
The flow:
MX record → inbound-smtp.us-east-1.amazonaws.com (SES)
→ SES Receipt Rule (spam + virus scan enabled)
→ SNS Topic
→ HTTPS POST /api/v1/ses/inbound
→ core/delivery.py
The Terraform module provisions this entire chain:
resource "aws_ses_receipt_rule" "inbound" {
name = "deliver-to-sns"
rule_set_name = aws_ses_receipt_rule_set.main.rule_set_name
recipients = [var.domain_name]
enabled = true
scan_enabled = true # SES runs spam + virus checks
sns_action {
topic_arn = aws_sns_topic.inbound_email.arn
position = 1
}
}
resource "aws_route53_record" "inbound_mx" {
zone_id = var.zone_id
name = var.domain_name
type = "MX"
ttl = 300
records = ["10 ${local.ses_inbound_endpoint}"]
}
The SNS subscription points at the API endpoint with endpoint_auto_confirms = true. When Terraform applies and SNS sends a SubscriptionConfirmation request, the API handles it automatically:
if msg_type == "SubscriptionConfirmation":
subscribe_url = payload.get("SubscribeURL", "")
if not subscribe_url.startswith("https://sns."):
return {"status": "rejected"}
urllib.request.urlopen(subscribe_url, timeout=10)
return {"status": "confirmed"}
In production, every incoming SNS message is verified against AWS's RSA-SHA1 certificate before processing. The certificate is fetched once and cached in memory. The certificate URL is validated against a strict regex to prevent SSRF:
_CERT_URL_RE = re.compile(
r"^https://sns\.[a-z0-9-]+\.amazonaws\.com/.*\.pem$"
)
This validation is skipped in dev environment to allow local testing without valid SNS credentials.
Shared Delivery Core
Both ingestion paths call the same function:
# core/delivery.py
async def deliver_raw_email(raw: bytes, address: str) -> bool:
async with AsyncSessionLocal() as db:
return await _save_to_mailbox(raw, address, db)
The internal _save_to_mailbox function handles the full pipeline: find active non-expired mailbox → resolve plan (owner's plan for authenticated users, free plan for anonymous) → check message quota → parse email → insert into DB → publish to Redis. Returns True if the message was saved, False for any silent rejection.
Email Parsing
Raw email bytes are stored verbatim. Separately, they are parsed into structured fields using Python's standard email module with the modern policy API:
def parse_email(raw: bytes) -> ParsedEmail:
msg = email.message_from_bytes(raw, policy=policy.default)
# Walk multipart, extract text/plain, text/html, attachment metadata
...
return ParsedEmail(
from_address=from_address,
to_address=to_address,
subject=subject,
body_text=body_text,
body_html=body_html,
attachments=attachments,
)
Attachment metadata (filename, content type, size, content-id) is stored as JSONB. The binary content stays in raw_email. This means you can re-parse the original bytes later with a different parser if your extraction needs change, without data loss.
4. API Layer Design
The API exposes six core endpoints, all under /api/v1:
POST /api/v1/mailboxes → Create mailbox
GET /api/v1/mailboxes → List mailboxes (auth required)
DELETE /api/v1/mailboxes/{address} → Soft-delete mailbox
GET /api/v1/mailboxes/{address}/messages → List messages
GET /api/v1/mailboxes/{address}/messages/{id} → Get message (marks as read)
DELETE /api/v1/mailboxes/{address}/messages/{id} → Delete message
POST /api/v1/ses/inbound → SES/SNS webhook
WS /ws/inbox/{address} → Real-time event stream
GET /health → DB + Redis liveness
The create endpoint is zero-friction for anonymous use:
# POST /api/v1/mailboxes
@router.post("/mailboxes", status_code=201)
async def create_mailbox(
ttl_minutes: int | None = Query(None, ge=1),
auth: AuthContext = Depends(get_any_auth),
db: AsyncSession = Depends(get_db),
):
plan = await _get_plan("free", db) # or auth.user.plan for authenticated
# Enforce mailbox quota
if active_count >= plan.max_mailboxes:
raise HTTPException(429, detail=f"Limit of {plan.max_mailboxes} active mailboxes reached")
# TTL is clamped to plan limits
effective_ttl = min(ttl_minutes or plan.default_ttl_minutes, plan.max_ttl_minutes)
# Generate address with collision retry
for attempt in range(_MAX_ADDRESS_RETRIES):
address = generate_friendly_address(settings.domain)
...
return {"address": address, "expires_at": ..., "session_token": ...}
Addresses are human-readable adjective-noun-number combinations (happy-tiger-42@uncorreotemporal.com). The generator retries up to 5 times on collision before returning 503. With a large enough namespace this virtually never happens, but the fallback exists.
5. Database and Expiration Logic
Schema
Two primary tables beyond users and plans:
mailboxes — indexed on (expires_at, is_active) and (owner_id, is_active):
class Mailbox(Base):
__tablename__ = "mailboxes"
__table_args__ = (
Index("ix_mailboxes_expires_at_is_active", "expires_at", "is_active"),
Index("ix_mailboxes_owner_id_is_active", "owner_id", "is_active"),
)
id: Mapped[uuid.UUID] # primary key
address: Mapped[str] # unique, indexed
expires_at: Mapped[datetime]
owner_type: Mapped[OwnerType] # anonymous | api | mcp
owner_id: Mapped[uuid.UUID | None] # NULL for anonymous
session_token: Mapped[str | None] # NULL for api/mcp
is_active: Mapped[bool]
messages — indexed on (mailbox_id, received_at):
class Message(Base):
id: Mapped[uuid.UUID]
mailbox_id: Mapped[uuid.UUID] # FK → mailboxes, CASCADE DELETE
raw_email: Mapped[bytes] # LargeBinary, complete RFC 2822
attachments: Mapped[list | None] # JSONB metadata only
body_text: Mapped[str | None]
body_html: Mapped[str | None]
received_at: Mapped[datetime]
is_read: Mapped[bool]
Expiration
Expiration is handled by a background asyncio task running inside the API process, not a cron job or external scheduler:
# core/expiry.py
async def _expire_mailboxes() -> int:
now = datetime.now(timezone.utc)
async with AsyncSessionLocal() as db:
result = await db.execute(
update(Mailbox)
.where(
Mailbox.expires_at <= now,
Mailbox.is_active == True,
)
.values(is_active=False)
)
await db.commit()
return result.rowcount
The task runs every 60 seconds using asyncio.wait_for with a timeout as the sleep mechanism. This allows the stop_event to interrupt the loop immediately on shutdown, rather than waiting for the full interval:
async def _run_expiry_loop(interval_seconds: int = 60) -> None:
while True:
await _expire_mailboxes()
try:
await asyncio.wait_for(_stop_event.wait(), timeout=interval_seconds)
break # stop signal received
except asyncio.TimeoutError:
pass # normal interval expiry, continue
The task is launched from the FastAPI lifespan:
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
start_expiry_task(interval_seconds=60)
yield
await stop_expiry_task()
await close_redis()
This design has an explicit constraint documented in the code: it is only compatible with --workers 1. Running multiple uvicorn workers would cause N parallel expiry tasks all competing to mark the same rows. For scale beyond a single worker, this would need a distributed lock (Redlock or similar) or migration to a dedicated scheduler (Celery Beat, APScheduler with Redis backend).
Plan-Based Quotas
Every mailbox and message creation is gated through the quota system:
# core/quota.py
async def check_message_quota(
mailbox: Mailbox, plan: Plan, db: AsyncSession
) -> bool:
if plan.max_messages_per_mailbox == -1:
return True # unlimited
result = await db.execute(
select(func.count(Message.id)).where(Message.mailbox_id == mailbox.id)
)
count = result.scalar_one()
return count < plan.max_messages_per_mailbox
def calculate_ttl(requested_ttl_minutes: int | None, plan: Plan) -> int:
ttl = requested_ttl_minutes or plan.default_ttl_minutes
return max(1, min(ttl, plan.max_ttl_minutes))
Plans are database-driven rows, not hardcoded constants. Changing plan limits requires updating the plans table, not a code deploy.
6. Real-Time Event Propagation with Redis + WebSockets
The Pub/Sub Architecture
When a message is saved to the database, core/delivery.py publishes a JSON event to a Redis channel named mailbox:{address}:
redis = await get_redis()
payload = json.dumps({"event": "new_message", "message_id": str(message.id)})
await redis.publish(f"mailbox:{address}", payload)
If Redis is unavailable, the publish failure is logged and swallowed — the message is already in the database, so persistence is not at risk.
WebSocket Endpoint
The WebSocket endpoint subscribes to the Redis channel and forwards events to connected clients:
@router.websocket("/ws/inbox/{address}")
async def websocket_inbox(
websocket: WebSocket,
address: str,
token: str | None = None,
api_key: str | None = None,
):
authorized = await _authenticate(address, token, api_key)
if not authorized:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await websocket.accept()
redis = await get_redis()
pubsub = redis.pubsub()
await pubsub.subscribe(f"mailbox:{address}")
async def _send_loop():
async for msg in pubsub.listen():
if msg["type"] == "message":
await websocket.send_text(msg["data"])
async def _ping_loop():
while True:
await asyncio.sleep(30)
await websocket.send_text(json.dumps({"event": "ping"}))
send_task = asyncio.create_task(_send_loop())
ping_task = asyncio.create_task(_ping_loop())
done, pending = await asyncio.wait(
[send_task, ping_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
# cleanup: unsubscribe, close pubsub
Two concurrent tasks run per connection: _send_loop listens to Redis and forwards events, and _ping_loop sends keepalive pings every 30 seconds to prevent proxy timeouts. When either task completes (due to disconnect or error), the other is cancelled and the pubsub connection is cleaned up.
Why Not Polling
Polling at any reasonable interval wastes resources on both client and server. At 1-second polling, a test suite with 50 concurrent workers makes 3,000 requests per minute to fetch messages that haven't arrived yet. With WebSockets, those 50 workers maintain 50 persistent connections. When a message arrives, it is pushed once. There are no empty responses, no wasted round trips, and no tradeoff between latency and request volume.
For AI agents and automated systems where message arrival time is unpredictable, the difference between polling and push is the difference between a script that works and one that requires tuning and monitoring.
7. Authentication and Security Model
The system supports three authentication contexts, modeled as an OwnerType enum:
class OwnerType(str, enum.Enum):
anonymous = "anonymous" # session_token, no account
api = "api" # Bearer API key, registered user
mcp = "mcp" # MCP server access, registered user
Anonymous Access
Anonymous users receive a session_token (a secrets.token_urlsafe(32)) in the mailbox creation response. This token is stored in the mailbox row and must be presented in subsequent requests to read or delete messages. It is not stored hashed because it is not a credential — it is a random short-lived identifier tied to a single mailbox.
API Key Access
Authenticated users authenticate with a Bearer API key. Keys are stored as SHA-256 hashes:
# ws/inbox.py — WebSocket authentication
if api_key:
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
ak_result = await db.execute(
select(ApiKey).where(
ApiKey.key_hash == key_hash,
ApiKey.is_active == True,
)
)
api_key_row = ak_result.scalar_one_or_none()
return str(mailbox.owner_id) == str(api_key_row.user_id)
The raw key is never stored. If the database is compromised, leaked hashes cannot be used to authenticate. The ownership check (mailbox.owner_id == api_key_row.user_id) ensures that a valid API key cannot access another user's mailboxes.
MCP Server Authentication
The MCP server authenticates with a single environment variable (UCT_API_KEY) and exposes tools directly to AI agents via stdio. It communicates over stdio and calls the service layer directly — no HTTP round-trips inside the same process.
Production Security Notes
In production, OpenAPI docs are disabled:
app = FastAPI(
docs_url="/docs" if settings.environment != "prod" else None,
redoc_url="/redoc" if settings.environment != "prod" else None,
)
Redis uses TLS (rediss://) in production automatically, based on the URL scheme in REDIS_URL. CORS origins are configured explicitly from the environment; there is no wildcard in production.
8. AWS and Production Infrastructure
All infrastructure is provisioned by Terraform, organized into reusable modules.
Infrastructure Dependency Graph
route53_zone ──┐
├──▶ acm ──▶ alb ──▶ route53 (records)
vpc ──┐ │
iam ──┼──▶ ec2─┘
├────────────▶ rds (Multi-AZ, 7-day backups)
└────────────▶ redis (replication group, 2 nodes)
route53_zone ──▶ ses (domain identity, DKIM, receipt rules, SNS)
(standalone) ecr (3 repos: api, smtp, front)
EC2: Single instance behind an ALB, using an instance profile for ECR access — no static AWS credentials stored on the machine. The API runs as a Docker container managed by Compose.
RDS: PostgreSQL 15, Multi-AZ in production, automated backups retained for 7 days. The endpoint is marked sensitive in Terraform outputs and never logged.
SES + DNS: DKIM records are provisioned automatically from the SES DKIM tokens:
resource "aws_route53_record" "dkim" {
for_each = toset(aws_ses_domain_dkim.main.dkim_tokens)
zone_id = var.zone_id
name = "${each.value}._domainkey.${var.domain_name}"
type = "CNAME"
ttl = 300
records = ["${each.value}.dkim.amazonses.com"]
}
SPF is set on the custom MAIL FROM subdomain (mail.uncorreotemporal.com) to authorize SES as a sender for outbound notifications.
ECR: Three repositories (uncorreotemporal-api, uncorreotemporal-smtp, uncorreotemporal-front), each with a lifecycle policy retaining the last 10 images. They are provisioned using for_each:
locals {
repos = toset(["uncorreotemporal-api", "uncorreotemporal-smtp", "uncorreotemporal-front"])
}
resource "aws_ecr_repository" "app" {
for_each = local.repos
name = each.key
image_tag_mutability = "MUTABLE"
}
CI/CD: GitHub Actions builds Docker images, pushes to ECR tagged with the commit SHA, SCPs the Compose file to EC2, then SSHs in and runs docker compose pull && docker compose up -d. The EC2 instance pulls images using its IAM role — no AWS credentials are stored as GitHub secrets beyond the deployment user's access key.
9. AI Agent Integration Use Cases
This is where the infrastructure nature of the system becomes most apparent.
The Problem Agents Have With Email
Modern AI agents are routinely asked to register for services, verify accounts, complete onboarding flows, and extract information from transactional email. This requires:
- A real email address that can receive mail
- The ability to register that address before the signup flow starts
- Event-driven notification when mail arrives (not polling)
- Programmatic extraction of the email body
Consumer disposable email services fail at all four points for agents. They have no API. They have no push notification. Their addresses are shared or predictable. They expire unpredictably.
MCP Integration
The MCP (Model Context Protocol) server exposes five tools directly to AI agents via stdio:
create_mailbox— provisions a new mailbox with a specified TTLlist_mailboxes— returns all active mailboxes for this agentget_messages— lists messages in a mailbox (subject, from, received_at)read_message— returns the full body of a specific messagedelete_mailbox— explicitly terminates a mailbox
An agent configured with UCT_API_KEY can drive the full email verification cycle without making HTTP requests. The MCP server handles transport and authentication; the agent sees named tools with typed inputs and outputs.
REST Integration for Agent Frameworks
For agents that operate via HTTP (LangChain, AutoGen, custom orchestrators), the REST + WebSocket API is sufficient:
# 1. Create a mailbox with a 10-minute TTL
RESP=$(curl -s -X POST "https://uncorreotemporal.com/api/v1/mailboxes?ttl_minutes=10" \
-H "Authorization: Bearer $UCT_API_KEY")
ADDRESS=$(echo $RESP | jq -r '.address')
# 2. Register on a third-party service using $ADDRESS
# 3. Open a WebSocket and wait for mail to arrive
# ws://uncorreotemporal.com/ws/inbox/$ADDRESS?api_key=$UCT_API_KEY
# → {"event": "new_message", "message_id": "uuid"}
# 4. Fetch the message body
curl -s "https://uncorreotemporal.com/api/v1/mailboxes/$ADDRESS/messages/$MESSAGE_ID" \
-H "Authorization: Bearer $UCT_API_KEY"
# → {"body_text": "Your code is 847291", "body_html": "..."}
# 5. Clean up
curl -s -X DELETE "https://uncorreotemporal.com/api/v1/mailboxes/$ADDRESS" \
-H "Authorization: Bearer $UCT_API_KEY"
The typical agent loop takes under a second from mailbox creation to message retrieval once the sending service delivers.
Concrete Scenarios
Account registration testing: A QA automation agent creates a unique mailbox per test run, registers on the system under test, waits for the confirmation email via WebSocket, extracts the verification link from body_html, and clicks it — all without manual intervention and with no shared state between test runs.
OTP extraction: Many services send one-time codes over email. An agent can open a WebSocket, trigger the OTP send, parse body_text with a regex, and proceed — without storing credentials or maintaining session state across environments.
Parallel test isolation: Multiple CI workers can each create their own mailboxes in the same second, register independently, and receive mail without any coordination mechanism. Each mailbox is isolated by address and by authenticated owner.
Webhook pipeline testing: An agent can provision a mailbox, configure it as the notification address in a third-party integration, trigger the integration, and verify the notification was received and correctly formatted.
10. Scaling Considerations
The current architecture targets single-instance deployments with clear scaling paths identified.
Database: PostgreSQL on RDS with Multi-AZ handles hundreds of concurrent connections with PgBouncer or SQLAlchemy connection pooling (currently pool_size=5, max_overflow=10). Read replicas are straightforward for message listing queries if needed.
Redis: ElastiCache with a replication group provides failover. Pub/sub is stateless — subscribers can connect to any replica.
The single-worker constraint: The asyncio expiry task is the binding constraint on horizontal scaling. The straightforward path is a dedicated Celery Beat worker or an external scheduler (AWS EventBridge → Lambda) calling a maintenance endpoint. This decouples expiration from the API process and makes the API stateless.
WebSocket connections: Each WebSocket connection holds one Redis pub/sub subscription. At high connection counts this creates pressure on Redis's subscription table. Moving to Redis Streams or a message broker (NATS, RabbitMQ) with fan-out semantics would scale further, but Redis pub/sub handles tens of thousands of subscriptions without issue on a cache.t3.small.
Email volume: SES handles email ingestion scaling transparently. The SNS webhook endpoint is stateless and can run behind the ALB on multiple API instances once the expiry task constraint is resolved.
11. Final Thoughts
Most email utilities are tools. Tools solve a specific problem in a specific context for a human operator. This system is infrastructure — a programmable layer that other systems can depend on.
The distinction matters in practice. Infrastructure needs an API. Infrastructure needs deterministic behavior. Infrastructure needs authentication boundaries. Infrastructure needs to compose with other systems, including AI agents.
The choices described in this article — real SMTP ingestion, async-first architecture, plan-based quotas, WebSocket event propagation, Terraform-managed AWS infrastructure — each address a concrete failure mode in the alternative approach.
Real SMTP via SES means you get actual email, with real deliverability, real spam filtering, and real DKIM signatures — not a simulation. Async architecture means hundreds of concurrent WebSocket connections and simultaneous database queries without thread contention. Deterministic expiration means test environments clean up after themselves without a human operator. WebSockets mean agents get pushed events instead of polling and hoping.
If you are building test automation, browser agents, or AI-driven workflows that need to interact with email, a programmable API over a real email infrastructure is the missing piece. Try the REST API at https://uncorreotemporal.com/api/v1/mailboxes — no signup required for anonymous use.
Written by
Software Engineer · Sr. Python Developer · AWS Certified Solutions Architect
Software engineer with 20 years of experience building Python backends, cloud infrastructure, and AI agent tooling. Builder of UnCorreoTemporal.
LinkedInReady to give your AI agents a real inbox?
Create your first temporary mailbox in 30 seconds. Free plan available.
Create your free mailbox