All files / lib/communication queue-consumer.ts

95.23% Statements 20/21
85.71% Branches 12/14
100% Functions 1/1
95.23% Lines 20/21

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61                            9x 9x 9x 9x 9x   9x 11x 11x   11x 1x 10x 8x   2x           9x                   9x   7x 2x       2x     2x 2x        
import { getDb } from "../../db";
import { createDal } from "../../dal";
import { EmailAdapter } from "./adapters/email.adapter";
import { SmsAdapter } from "./adapters/sms.adapter";
import { WhatsAppAdapter } from "./adapters/whatsapp.adapter";
import type {
	AdapterResult,
	CommunicationQueueMessage,
} from "./types";
 
export async function handleCommunicationQueue(
	batch: MessageBatch<CommunicationQueueMessage>,
	env: CloudflareBindings,
): Promise<void> {
	const db = getDb(env.DB);
	const dal = createDal(db);
	const emailAdapter = new EmailAdapter();
	const smsAdapter = new SmsAdapter();
	const whatsAppAdapter = new WhatsAppAdapter();
 
	for (const msg of batch.messages) {
		try {
			const send = msg.body as CommunicationQueueMessage;
			let result: AdapterResult | undefined;
			if (send.channel === "sms") {
				result = await smsAdapter.send(send, env);
			} else if (send.channel === "email") {
				result = await emailAdapter.send(send, env);
			} else {
				result = await whatsAppAdapter.send(send, env, dal);
			}
 
			// Update communication_log with delivery result. Note: attribute
			// skipped-due-to-suppression as 'skipped' (NOT 'failed') so retries
			// don't fire and counters don't inflate.
			await dal.communicationLog.updateStatus(send.logId, {
				status: result.status === "skipped" ? "skipped" : result.status,
				actualRecipient: result.actualRecipient,
				provider: result.provider,
				externalId: result.externalId,
				previewHtml: result.previewHtml,
				previewText: result.previewText,
				errorMessage: result.errorMessage,
			});
 
			if (result.status === "sent" || result.status === "skipped") {
				// Skipped = compliance decision; NOT a transient failure. Ack.
				msg.ack();
			I} else if (result.errorCode === "policy" || result.errorCode === "template_rejected") {
				// Permanent failures: don't retry.
				msg.ack();
			} else {
				msg.retry();
			}
		} catch (error) {
			console.error("[COMMUNICATION] Queue processing error:", error);
			msg.retry();
		}
	}
}