import { _n } from "../../../../client-exports"; import getQueue from "../../../../package-shared/functions/backend/queues/get-queue"; import updateQueue from "../../../../package-shared/functions/backend/queues/update-queue"; import { DSQL_DATASQUIREL_PROCESS_QUEUE } from "../../../../package-shared/types/dsql"; import debugLog from "../../../../package-shared/utils/logging/debug-log"; import { QueueJobTypes } from "../../../../types"; function debugLogFn(log: any, label?: string) { debugLog({ log, addTime: true, label, title: __filename.split("/").pop() }); } export default async function cronHandleQueue() { const INTERVAL = 5000; while (true) { await (async () => { const lastQueueItemRes = (await getQueue({ query: { query: { error: { value: "0", }, running: { value: "0", }, success: { value: "0", }, }, order: { field: "id", strategy: "ASC", }, limit: 1, }, })) as DSQL_DATASQUIREL_PROCESS_QUEUE[] | undefined; const lastQueueItem = lastQueueItemRes?.[0]; if (!lastQueueItem) return; debugLogFn(lastQueueItem.title, "Running Queue"); await updateQueue({ queueId: _n(lastQueueItem.id), queue: { running: 1, }, }); const queueJobType: (typeof QueueJobTypes)[number] = lastQueueItem.job_type as (typeof QueueJobTypes)[number]; try { switch (queueJobType) { /** * # Dummy Queue */ case "dummy": await Bun.sleep(20000); break; /** * # Unhandled */ default: return; } } catch (error: any) { debugLogFn(error.message, "ERROR"); await updateQueue({ queueId: _n(lastQueueItem.id), queue: { running: 0, error: 1, error_message: String(error.message), }, }); return; } await updateQueue({ queueId: _n(lastQueueItem.id), queue: { success: 1, }, }); })(); await Bun.sleep(INTERVAL); } }