merge: inbound email processing from worktree/ageless-shale

Merge the mail.tm transport and email processing pipeline into main.
- Transport-agnostic email processor with mail.tm polling transport
- POST /mailgate webhook endpoint
- User/queue/ticket resolvers with stub user auto-creation
- Queue mail_alias column for email routing
- Migration renumbered to 0021 to follow main's 0019/0020

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Gjermund Høsøien Wiggen
2026-06-15 21:42:37 +02:00
parent 631365ab07
commit 1c92f488f7
11 changed files with 2700 additions and 1 deletions

View File

@@ -0,0 +1 @@
ALTER TABLE "queues" ADD COLUMN "mail_alias" text;

File diff suppressed because it is too large Load Diff

View File

@@ -148,6 +148,13 @@
"when": 1781552001000,
"tag": "0020_sla_tables",
"breakpoints": true
},
{
"idx": 21,
"version": "7",
"when": 1781552215621,
"tag": "0021_romantic_captain_midlands",
"breakpoints": true
}
]
}
}

View File

@@ -11,6 +11,12 @@ const configSchema = z.object({
SMTP_FROM: z.string().default('tessera@localhost'),
UPLOAD_DIR: z.string().default('./data/uploads'),
JWT_SECRET: z.string().default('tessera-dev-secret-change-in-production'),
// Inbound email
MAIL_TRANSPORT: z.enum(['mailtm', 'webhook', 'none']).default('none'),
MAILTM_POLL_SECONDS: z.coerce.number().int().positive().default(30),
MAILTM_ADDRESS: z.string().optional(),
MAILTM_ACCOUNT_ID: z.string().optional(),
MAILTM_TOKEN: z.string().optional(),
});
export const config = configSchema.parse(process.env);

View File

@@ -16,6 +16,7 @@ export const queues = pgTable('queues', {
description: text('description'),
lifecycle_id: uuid('lifecycle_id').references(() => lifecycles.id),
team_id: uuid('team_id').references(() => teams.id, { onDelete: 'set null' }),
mail_alias: text('mail_alias'),
created_at: timestamp('created_at', { withTimezone: true }).defaultNow(),
});

250
src/email/mailtm.ts Normal file
View File

@@ -0,0 +1,250 @@
import { config } from '../config.ts';
import type { InboundEmail } from './types.ts';
import type { EmailProcessor } from './processor.ts';
interface MailTmAccount {
id: string;
address: string;
password: string;
token?: string;
}
interface MailTmMessage {
id: string;
msgid: string;
from: {
address: string;
name: string;
};
to: Array<{
address: string;
name: string;
}>;
subject: string;
text: string;
html: string;
seen: boolean;
createdAt: string;
}
const API_BASE = 'https://api.mail.tm';
/**
* mail.tm transport: creates a disposable inbox, polls for new messages,
* and feeds them to the EmailProcessor.
*/
export class MailTmTransport {
private account: MailTmAccount | null = null;
private processor: EmailProcessor | null = null;
private intervalId: ReturnType<typeof setInterval> | null = null;
private running = false;
constructor(processor: EmailProcessor) {
this.processor = processor;
}
async start(): Promise<string> {
if (this.running) {
return this.account?.address ?? '';
}
this.running = true;
// If account cached in env, reuse it
if (config.MAILTM_ACCOUNT_ID && config.MAILTM_TOKEN) {
this.account = {
id: config.MAILTM_ACCOUNT_ID,
address: config.MAILTM_ADDRESS ?? 'unknown@mail.tm',
password: '', // not needed when reusing token
token: config.MAILTM_TOKEN,
};
console.log(`[mailtm] Reusing cached inbox: ${this.account.address}`);
} else {
this.account = await this.createAccount();
console.log(`[mailtm] Created test inbox: ${this.account.address}`);
console.log(`[mailtm] To persist this inbox, set in .env:`);
console.log(` MAILTM_ADDRESS=${this.account.address}`);
console.log(` MAILTM_ACCOUNT_ID=${this.account.id}`);
console.log(` MAILTM_TOKEN=${this.account.token}`);
}
// Start polling
const seconds = config.MAILTM_POLL_SECONDS;
console.log(`[mailtm] Polling every ${seconds}s — use this address: ${this.account.address}`);
this.poll(); // immediate first poll
this.intervalId = setInterval(() => this.poll(), seconds * 1000);
return this.account.address;
}
stop(): void {
this.running = false;
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
console.log('[mailtm] Stopped');
}
private async createAccount(): Promise<MailTmAccount> {
// Get available domains first (mail.tm rotates domains)
let domain = 'mail.tm';
try {
const domainResp = await fetch(`${API_BASE}/domains`);
if (domainResp.ok) {
const data = await domainResp.json() as { 'hydra:member': Array<{ domain: string }> };
if (data['hydra:member']?.length > 0) {
domain = data['hydra:member'][0]!.domain;
}
}
} catch {
// Fall back to default domain
}
const username = `tessera-${crypto.randomUUID().slice(0, 8)}`;
const password = crypto.randomUUID();
const resp = await fetch(`${API_BASE}/accounts`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ address: `${username}@${domain}`, password }),
});
if (!resp.ok) {
const body = await resp.text().catch(() => '');
throw new Error(`mail.tm account creation failed: HTTP ${resp.status} ${body}`);
}
const account = (await resp.json()) as { id: string; address: string };
// Get token
const tokenResp = await fetch(`${API_BASE}/token`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ address: account.address, password }),
});
if (!tokenResp.ok) {
throw new Error(`mail.tm token request failed: HTTP ${tokenResp.status}`);
}
const tokenData = (await tokenResp.json()) as { token: string };
return {
id: account.id,
address: account.address,
password,
token: tokenData.token,
};
}
private async fetchMessages(): Promise<MailTmMessage[]> {
if (!this.account?.token) return [];
try {
const resp = await fetch(`${API_BASE}/messages`, {
headers: { Authorization: `Bearer ${this.account.token}` },
});
if (!resp.ok) {
// Token might have expired, try to refresh
if (resp.status === 401) {
try {
this.account = await this.createAccount();
return [];
} catch {
console.error('[mailtm] Failed to refresh account');
return [];
}
}
return [];
}
const data = (await resp.json()) as { 'hydra:member': MailTmMessage[] };
return (data['hydra:member'] ?? []).filter((m) => !m.seen);
} catch (err) {
console.error('[mailtm] Error fetching messages:', err instanceof Error ? err.message : String(err));
return [];
}
}
private async getFullMessage(messageId: string): Promise<MailTmMessage | null> {
if (!this.account?.token) return null;
try {
const resp = await fetch(`${API_BASE}/messages/${messageId}`, {
headers: { Authorization: `Bearer ${this.account.token}` },
});
if (!resp.ok) return null;
return (await resp.json()) as MailTmMessage;
} catch {
return null;
}
}
private async markRead(messageId: string): Promise<void> {
if (!this.account?.token) return;
try {
await fetch(`${API_BASE}/messages/${messageId}`, {
method: 'PATCH',
headers: {
Authorization: `Bearer ${this.account.token}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ seen: true }),
});
} catch {
// Best effort
}
}
private async poll(): Promise<void> {
if (!this.running || !this.processor) return;
const messages = await this.fetchMessages();
if (messages.length === 0) return;
console.log(`[mailtm] Found ${messages.length} new message(s)`);
for (const summary of messages) {
const full = await this.getFullMessage(summary.id);
if (!full) continue;
// Build the from display string
const fromName = full.from?.name ?? '';
const fromAddr = full.from?.address ?? '';
const fromDisplay = fromName ? `${fromName} <${fromAddr}>` : fromAddr;
// Build the to display string
const toDisplay = (full.to ?? [])
.map((t) => (t.name ? `${t.name} <${t.address}>` : t.address))
.join(', ');
const inbound: InboundEmail = {
from: fromDisplay,
fromAddress: fromAddr,
to: toDisplay || this.account!.address,
subject: full.subject ?? '(no subject)',
bodyText: full.text ?? '',
bodyHtml: full.html,
attachments: [],
messageId: full.msgid ?? full.id,
receivedAt: new Date(full.createdAt ?? Date.now()),
};
try {
const result = await this.processor.process(inbound);
console.log(`[mailtm] Processed: ${result.action}${result.detail}`);
} catch (err) {
console.error(
`[mailtm] Error processing message ${summary.id}:`,
err instanceof Error ? err.message : String(err),
);
}
await this.markRead(summary.id);
}
}
}

172
src/email/processor.ts Normal file
View File

@@ -0,0 +1,172 @@
import { eq } from 'drizzle-orm';
import type { Db } from '../db/index.ts';
import { tickets, transactions, queues, lifecycles } from '../db/schema.ts';
import { ScripEngine } from '../scrip/engine.ts';
import { LifecycleValidator } from '../lifecycle/validator.ts';
import type { LifecycleDefinition } from '../lifecycle/validator.ts';
import { resolveUser, resolveQueue, matchTicket } from './resolvers.ts';
import type { InboundEmail, ProcessResult } from './types.ts';
/** Tracks recently seen message IDs for dedup */
const seenMessageIds = new Set<string>();
const MAX_SEEN_IDS = 2000;
function isSeen(messageId: string): boolean {
if (seenMessageIds.has(messageId)) return true;
seenMessageIds.add(messageId);
// Prune oldest entries if set grows too large
if (seenMessageIds.size > MAX_SEEN_IDS) {
const iter = seenMessageIds.values();
for (let i = 0; i < 500; i++) {
const { value, done } = iter.next();
if (done) break;
seenMessageIds.delete(value!);
}
}
return false;
}
/**
* Extract the plain from-address from a From header.
* Handles "Alice <alice@example.com>" and plain "alice@example.com".
*/
function parseAddress(raw: string): string {
const match = raw.match(/<([^>]+)>/);
if (match) return match[1]!.trim().toLowerCase();
return raw.trim().toLowerCase();
}
export class EmailProcessor {
private db: Db;
private scripEngine: ScripEngine;
private lifecycleValidator: LifecycleValidator;
constructor(db: Db) {
this.db = db;
this.scripEngine = new ScripEngine(db);
this.lifecycleValidator = new LifecycleValidator();
}
async process(email: InboundEmail): Promise<ProcessResult> {
// Dedup by messageId
if (isSeen(email.messageId)) {
console.log(`[email] Skipping duplicate message: ${email.messageId}`);
return { action: 'skipped', detail: `Duplicate messageId: ${email.messageId}` };
}
const fromAddress = parseAddress(email.from);
const toAddress = parseAddress(email.to);
// Resolve user
const { userId, isNew } = await resolveUser(this.db, fromAddress);
if (isNew) {
console.log(`[email] Created stub user for ${fromAddress}`);
}
// Resolve queue
const { queueId, queueName } = await resolveQueue(this.db, toAddress);
console.log(`[email] Routed to queue "${queueName}"`);
// Match or create ticket
const matchedTicket = await matchTicket(this.db, email.subject);
if (matchedTicket) {
// Reply: add Correspond transaction
const [tx] = await this.db
.insert(transactions)
.values({
ticket_id: matchedTicket.id,
transaction_type: 'Correspond',
data: {
body: email.bodyText || email.bodyHtml || '(no body)',
from: fromAddress,
message_id: email.messageId,
},
creator_id: userId,
})
.returning();
if (!tx) {
return { action: 'skipped', detail: 'Failed to create transaction' };
}
// Update ticket timestamp
await this.db
.update(tickets)
.set({ updated_at: new Date() } as any)
.where(eq(tickets.id, matchedTicket.id));
// Fire scrips (e.g. OnCorrespond → auto-reply)
const prepared = await this.scripEngine.prepare(matchedTicket.id, [tx] as any);
const results = await this.scripEngine.commit(prepared);
console.log(
`[email] Reply on ticket ${matchedTicket.id}, scrips: ${results.length} (${results.map((r) => r.message).join(', ')})`,
);
return { action: 'replied', ticketId: matchedTicket.id, detail: `Correspond on ticket ${matchedTicket.id}` };
}
// New ticket
const queue = await this.db.query.queues.findFirst({
where: eq(queues.id, queueId),
});
let initialStatus = 'new';
if (queue?.lifecycle_id) {
const lifecycle = await this.db.query.lifecycles.findFirst({
where: eq(lifecycles.id, queue.lifecycle_id),
});
const definition = lifecycle?.definition as LifecycleDefinition | undefined;
initialStatus = definition?.statuses.initial[0] ?? initialStatus;
}
const [ticket] = await this.db
.insert(tickets)
.values({
subject: email.subject,
queue_id: queueId,
status: initialStatus,
creator_id: userId,
team_id: (queue as any)?.team_id ?? null,
})
.returning();
if (!ticket) {
return { action: 'skipped', detail: 'Failed to create ticket' };
}
// Create transaction + correspond in one batch
const txList = [
{
ticket_id: ticket.id,
transaction_type: 'Create',
field: 'status',
new_value: initialStatus,
creator_id: userId,
},
{
ticket_id: ticket.id,
transaction_type: 'Correspond',
field: null,
new_value: null,
data: {
body: email.bodyText || email.bodyHtml || '(no body)',
from: fromAddress,
message_id: email.messageId,
},
creator_id: userId,
},
];
await this.db.insert(transactions).values(txList as any);
// Fire scrips on TransactionBatch (OnCreate + OnCorrespond)
const prepared = await this.scripEngine.prepare(ticket.id, txList as any, 'TransactionBatch');
const results = await this.scripEngine.commit(prepared);
console.log(
`[email] Created ticket ${ticket.id} in "${queueName}", scrips: ${results.length} (${results.map((r) => r.message).join(', ')})`,
);
return { action: 'created', ticketId: ticket.id, detail: `Ticket ${ticket.id} created in ${queueName}` };
}
}

128
src/email/resolvers.ts Normal file
View File

@@ -0,0 +1,128 @@
import { eq, ilike, and } from 'drizzle-orm';
import type { Db } from '../db/index.ts';
import { users, queues, tickets } from '../db/schema.ts';
/**
* Resolve a sender email address to a user record.
* Creates a stub "unverified" user if no match is found.
*/
export async function resolveUser(
db: Db,
fromAddress: string,
): Promise<{ userId: string; isNew: boolean }> {
// Try exact match first
const existing = await db.query.users.findFirst({
where: eq(users.email, fromAddress),
});
if (existing) {
return { userId: existing.id, isNew: false };
}
// Create a stub user with role 'unverified'
const username = fromAddress.replace(/@/g, '-at-').replace(/[^a-zA-Z0-9._-]/g, '');
const [stub] = await db
.insert(users)
.values({
username,
email: fromAddress,
role: 'unverified',
})
.returning();
if (!stub) {
throw new Error(`Failed to create stub user for ${fromAddress}`);
}
return { userId: stub.id, isNew: true };
}
/**
* Resolve the "to" address to a queue.
* Strategy:
* 1. Check for exact mail_alias match on any queue
* 2. Check if the local-part matches a queue name
* 3. Fallback to the first available queue
*/
export async function resolveQueue(
db: Db,
toAddress: string,
): Promise<{ queueId: string; queueName: string }> {
const localPart = toAddress.split('@')[0]?.toLowerCase() ?? '';
const fullAddress = toAddress.trim().toLowerCase();
// 1. Check mail_alias exact match
if (fullAddress) {
const byAlias = await db.query.queues.findFirst({
where: eq(queues.mail_alias, fullAddress),
});
if (byAlias) {
return { queueId: byAlias.id, queueName: byAlias.name };
}
}
// 2. Check queue name matching the local-part
if (localPart) {
const byName = await db.query.queues.findFirst({
where: eq(queues.name, localPart),
});
if (byName) {
return { queueId: byName.id, queueName: byName.name };
}
}
// 3. Fallback to first queue
const allQueues = await db.query.queues.findMany({ limit: 1 });
const fallback = allQueues[0];
if (fallback) {
return { queueId: fallback.id, queueName: fallback.name };
}
throw new Error('No queues exist — cannot route inbound email');
}
/**
* Scan subject for ticket ID patterns.
* Supports: TKT-XXXX, TKT-XXXXX, #NNN, [queue-name #NNN]
*
* Returns the matched ticket if found and not in a closed state,
* or null if no ticket was matched.
*/
export async function matchTicket(
db: Db,
subject: string,
): Promise<{ id: number; subject: string; status: string } | null> {
const trimmed = subject.trim();
// TKT-XXXX or TKT-XXXXX (Tessera display format)
const tktMatch = trimmed.match(/\bTKT-(\d{4,5})\b/i);
if (tktMatch) {
const id = parseInt(tktMatch[1]!, 10);
if (!isNaN(id)) {
const ticket = await db.query.tickets.findFirst({ where: eq(tickets.id, id) });
if (ticket) return { id: ticket.id, subject: ticket.subject, status: ticket.status };
}
}
// [queue-name #NNN] (RT-compatible bracket format)
const bracketMatch = trimmed.match(/\[[\w-]+\s*#(\d+)\]/i);
if (bracketMatch) {
const id = parseInt(bracketMatch[1]!, 10);
if (!isNaN(id)) {
const ticket = await db.query.tickets.findFirst({ where: eq(tickets.id, id) });
if (ticket) return { id: ticket.id, subject: ticket.subject, status: ticket.status };
}
}
// #NNN (shorthand, requires word boundary before #)
const hashMatch = trimmed.match(/(?:^|\s)#(\d+)\b/);
if (hashMatch) {
const id = parseInt(hashMatch[1]!, 10);
if (!isNaN(id)) {
const ticket = await db.query.tickets.findFirst({ where: eq(tickets.id, id) });
if (ticket) return { id: ticket.id, subject: ticket.subject, status: ticket.status };
}
}
return null;
}

24
src/email/types.ts Normal file
View File

@@ -0,0 +1,24 @@
export interface InboundEmailAttachment {
filename: string;
mimeType: string;
content: Buffer;
}
export interface InboundEmail {
from: string; // "Alice <alice@example.com>"
fromAddress: string; // "alice@example.com"
to: string; // "support@mail.tm"
subject: string;
bodyText: string;
bodyHtml?: string;
attachments: InboundEmailAttachment[];
messageId: string; // for dedup
receivedAt: Date;
}
/** Result of processing one inbound email */
export interface ProcessResult {
action: 'created' | 'replied' | 'skipped';
ticketId?: number;
detail: string;
}

View File

@@ -21,7 +21,10 @@ import { createAuthRouter } from './routes/auth.ts';
import { createQueuePermissionsRouter } from './routes/queue-permissions.ts';
import { createSlaPoliciesRouter } from './routes/sla-policies.ts';
import { createNotificationsRouter } from './routes/notifications.ts';
import { createMailgateRouter } from './routes/mailgate.ts';
import { startScheduler } from './scrip/scheduler.ts';
import { EmailProcessor } from './email/processor.ts';
import { MailTmTransport } from './email/mailtm.ts';
let db: Db | null = null;
@@ -39,10 +42,16 @@ app.onError(errorHandler);
const { requireAuth, requireAdmin } = createAuthMiddleware(getDb());
// Email processor (shared between transport and webhook)
const emailProcessor = new EmailProcessor(getDb());
// Public routes
app.route('/health', healthRouter);
app.route('/', createAuthRouter(getDb()));
// Mailgate webhook — public endpoint for receiving inbound emails
app.route('/mailgate', createMailgateRouter(getDb(), emailProcessor));
// Ticket routes — require authentication
const ticketsWithAuth = new Hono();
ticketsWithAuth.use('*', requireAuth);
@@ -87,4 +96,12 @@ if (Bun.main === import.meta.path) {
// Start the scrip scheduler (runs every 5 minutes)
startScheduler(getDb());
// Start inbound email transport
if (config.MAIL_TRANSPORT === 'mailtm') {
const transport = new MailTmTransport(emailProcessor);
transport.start().catch((err) => {
console.error('[email] Failed to start mail.tm transport:', err instanceof Error ? err.message : String(err));
});
}
}

69
src/routes/mailgate.ts Normal file
View File

@@ -0,0 +1,69 @@
import { Hono } from 'hono';
import { HTTPException } from 'hono/http-exception';
import { z } from 'zod/v4';
import type { Db } from '../db/index.ts';
import type { InboundEmail } from '../email/types.ts';
import type { EmailProcessor } from '../email/processor.ts';
const WebhookPayloadSchema = z.object({
from: z.string().min(1),
to: z.string().min(1),
subject: z.string().min(1),
body_text: z.string().optional().default(''),
body_html: z.string().optional(),
message_id: z.string().optional(),
attachments: z
.array(
z.object({
filename: z.string(),
mime_type: z.string().optional().default('application/octet-stream'),
content_base64: z.string(),
}),
)
.optional()
.default([]),
});
/**
* POST /mailgate — webhook endpoint for receiving inbound emails.
*
* Accepts JSON payload from external mail services (SendGrid, Mailgun, etc.).
* When MAIL_TRANSPORT is 'webhook', this is the primary inbound path.
* When MAIL_TRANSPORT is 'mailtm' or 'none', it's still available as a
* secondary path (useful for testing or hybrid setups).
*/
export function createMailgateRouter(db: Db, processor: EmailProcessor): Hono {
const router = new Hono();
router.post('/', async (c) => {
const body = await c.req.json();
const parsed = WebhookPayloadSchema.parse(body);
const inbound: InboundEmail = {
from: parsed.from,
fromAddress: extractAddress(parsed.from),
to: parsed.to,
subject: parsed.subject,
bodyText: parsed.body_text,
bodyHtml: parsed.body_html,
messageId: parsed.message_id ?? `${Date.now()}-${crypto.randomUUID()}`,
receivedAt: new Date(),
attachments: parsed.attachments.map((att) => ({
filename: att.filename,
mimeType: att.mime_type,
content: Buffer.from(att.content_base64, 'base64'),
})),
};
const result = await processor.process(inbound);
return c.json(result, result.action === 'skipped' ? 200 : 201);
});
return router;
}
function extractAddress(raw: string): string {
const match = raw.match(/<([^>]+)>/);
if (match) return match[1]!.trim().toLowerCase();
return raw.trim().toLowerCase();
}