Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | 16x 16x 16x 16x 16x 16x 3x 3x 13x 13x 13x 12x 12x 12x 12x 12x 12x 12x 7x 12x 6x 12x 12x 12x 2x 12x 5x 5x 12x 4x 4x 12x 12x 11x 5x 5x 6x 11x 2x 2x 24x 24x 53x | /**
* Analytics Sync Job
*
* Daily cron job that aggregates Analytics Engine data to D1.
* Runs at 2:00 AM IST (20:30 UTC previous day)
*
* Flow:
* 1. Get yesterday's date in IST
* 2. Query Analytics Engine for pro stats via SQL API
* 3. Query Analytics Engine for project stats
* 4. Query Analytics Engine for sources/devices/cities breakdowns
* 5. Write to analytics_pro_daily and analytics_project_daily
* 6. Update analytics_pro_summary with rolling totals
*
* Analytics Engine SQL API:
* - Endpoint: https://api.cloudflare.com/client/v4/accounts/{account_id}/analytics_engine/sql
* - Requires: CLOUDFLARE_ACCOUNT_ID and CLOUDFLARE_API_TOKEN environment variables
* - Blob mapping follows the spec in the analytics implementation document
*/
// Re-export types
export * from "./types";
// Re-export date helpers
export { getYesterdayIST, getDateRangeUTC } from "./date-helpers";
// Re-export query functions
export { queryProStats, queryProjectStats } from "./queries";
// Re-export writer functions
export {
writeProDaily,
writeProjectDaily,
updateProSummaries,
} from "./writers";
// Import for main function
import type { AnalyticsSyncEnv } from "./types";
import { getYesterdayIST } from "./date-helpers";
import { queryProStats, queryProjectStats } from "./queries";
import {
writeProDaily,
writeProjectDaily,
updateProSummaries,
} from "./writers";
import { createDualCache, MARKETPLACE } from "../cache";
/**
* Main sync function - orchestrates the entire daily aggregation process
* Called by the scheduled cron trigger
*/
export async function syncAnalytics(env: AnalyticsSyncEnv): Promise<void> {
const date = getYesterdayIST();
console.log(`[Analytics Sync] Starting for date: ${date}`);
// Check required environment variables
const accountId = env.CLOUDFLARE_ACCOUNT_ID;
const apiToken = env.CLOUDFLARE_API_TOKEN;
const dataset = env.ANALYTICS_DATASET || "dr-local-events";
if (!accountId || !apiToken) {
console.log(
"[Analytics Sync] Skipping - CLOUDFLARE_ACCOUNT_ID and CLOUDFLARE_API_TOKEN not configured",
);
return;
}
try {
// Step 1: Query Analytics Engine for pro stats
console.log("[Analytics Sync] Querying pro stats...");
const proStats = await queryProStats(
accountId,
apiToken,
dataset,
date,
);
console.log(
`[Analytics Sync] Found ${proStats.length} pros with activity`,
);
// Step 2: Query Analytics Engine for project stats
console.log("[Analytics Sync] Querying project stats...");
const projectStats = await queryProjectStats(
accountId,
apiToken,
dataset,
date,
);
console.log(
`[Analytics Sync] Found ${projectStats.length} projects with activity`,
);
// Step 3: Filter out IDs not present in D1 (Analytics Engine may
// contain events for deleted pros/projects — writing those would
// violate FK constraints on analytics_pro_daily / analytics_project_daily)
const validProIds = await getValidIds(env.DB, "pros");
const validProjectIds = await getValidIds(env.DB, "projects");
const filteredProStats = proStats.filter((s) =>
validProIds.has(s.pro_id),
);
const filteredProjectStats = projectStats.filter(
(s) =>
validProjectIds.has(s.project_id) &&
validProIds.has(s.pro_id),
);
const skippedPros = proStats.length - filteredProStats.length;
const skippedProjects =
projectStats.length - filteredProjectStats.length;
if (skippedPros > 0 || skippedProjects > 0) {
console.log(
`[Analytics Sync] Skipped ${skippedPros} pro rows and ${skippedProjects} project rows (stale IDs not in D1)`,
);
}
// Step 4: Write pro daily stats to D1 (if any)
if (filteredProStats.length > 0) {
console.log("[Analytics Sync] Writing pro daily stats...");
await writeProDaily(env.DB, filteredProStats, date);
}
// Step 5: Write project daily stats to D1 (if any)
if (filteredProjectStats.length > 0) {
console.log("[Analytics Sync] Writing project daily stats...");
await writeProjectDaily(env.DB, filteredProjectStats, date);
}
// Step 6: Update pro summary table
console.log("[Analytics Sync] Updating pro summaries...");
await updateProSummaries(env.DB);
// Step 7: Invalidate cached stats so dashboards show fresh data
if (filteredProStats.length > 0) {
const cache = createDualCache(env.KV_CACHE);
await Promise.all(
filteredProStats.map((v) =>
cache.delete(MARKETPLACE.proStats(v.pro_id)),
),
);
}
console.log(
`[Analytics Sync] Complete. Pros: ${filteredProStats.length}, Projects: ${filteredProjectStats.length}`,
);
} catch (error) {
console.error("[Analytics Sync] Error:", error);
throw error;
}
}
/**
* Fetch all IDs from a table as a Set for O(1) lookups.
* Used to filter out stale Analytics Engine events before writing to D1.
*/
export async function getValidIds(
db: D1Database,
table: "pros" | "projects",
): Promise<Set<string>> {
const stmt =
table === "pros"
? db.prepare("SELECT id FROM pros")
: db.prepare("SELECT id FROM projects");
const { results } = await stmt.all<{ id: string }>();
return new Set(results.map((r) => r.id));
}
|