All files / lib/whatsapp queue-handler.ts

100% Statements 23/23
100% Branches 13/13
100% Functions 1/1
100% Lines 23/23

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57                      9x 1x 1x 2x   1x     8x 8x           8x 8x   8x 10x 10x   10x         1x 1x 9x 8x 6x   8x 5x       8x   2x 2x        
import type { WhatsAppQueueMessage } from "./types";
import { createWhatsAppClient } from "./client";
import { getDb } from "../../db";
import { createDal } from "../../dal";
import { CampaignService } from "../../services/whatsapp/campaign.service";
import { getWhatsAppIds } from "../env-config";
 
export async function handleWhatsAppQueue(
	batch: MessageBatch<WhatsAppQueueMessage>,
	env: CloudflareBindings,
) {
	if (!env.WHATSAPP_ACCESS_TOKEN) {
		console.error("[WA_QUEUE] WhatsApp not configured, skipping batch");
		for (const msg of batch.messages) {
			msg.ack();
		}
		return;
	}
 
	const { phoneNumberId, wabaId } = getWhatsAppIds();
	const client = createWhatsAppClient({
		phoneNumberId,
		accessToken: env.WHATSAPP_ACCESS_TOKEN,
		wabaId,
		environment: env.ENVIRONMENT,
	});
	const db = getDb(env.DB);
	const dal = createDal(db);
 
	for (const msg of batch.messages) {
		try {
			const data = msg.body;
 
			if (
				data.type === "campaign_send" &&
				data.campaignId &&
				data.recipientId
			) {
				const campaignService = new CampaignService(dal);
				await campaignService.processQueueMessage(data, client);
			} else if (data.type === "reply") {
				const result = await client.sendRaw(data.payload);
				const wamid = result.messages[0]?.id;
 
				if (data.conversationId && wamid) {
					await dal.waMessages.updateStatus(wamid, "sent");
				}
			}
 
			msg.ack();
		} catch (error) {
			console.error("[WA_QUEUE] Processing error:", error);
			msg.retry();
		}
	}
}