dsql-admin/dsql-app/docker/cron/(functions)/queue/handle-queue.ts

89 lines
2.6 KiB
TypeScript
Raw Normal View History

2025-02-16 16:12:40 +00:00
import { _n } from "@/client-exports";
import getQueue from "@/package-shared/functions/backend/queues/get-queue";
import updateQueue from "@/package-shared/functions/backend/queues/update-queue";
import { DSQL_DATASQUIREL_PROCESS_QUEUE } from "@/package-shared/types/dsql";
import debugLog from "@/package-shared/utils/logging/debug-log";
function debugLogFn(log: any, label?: string) {
debugLog({ log, addTime: true, label, title: __filename.split("/").pop() });
}
export default async function cronHandleQueue() {
const INTERVAL = 5000;
while (true) {
await (async () => {
const lastQueueItemRes = (await getQueue({
query: {
query: {
error: {
value: "0",
},
running: {
value: "0",
},
success: {
value: "0",
},
},
order: {
field: "id",
strategy: "ASC",
},
limit: 1,
},
})) as DSQL_DATASQUIREL_PROCESS_QUEUE[] | undefined;
const lastQueueItem = lastQueueItemRes?.[0];
if (!lastQueueItem) return;
debugLogFn(lastQueueItem.title, "Running Queue");
await updateQueue({
queueId: _n(lastQueueItem.id),
queue: {
running: 1,
},
});
try {
switch (lastQueueItem.job_type) {
/**
* # Dummy Queue
*/
case "dummy":
await Bun.sleep(20000);
break;
/**
* # Unhandled
*/
default:
return;
}
} catch (error: any) {
debugLogFn(error.message, "ERROR");
await updateQueue({
queueId: _n(lastQueueItem.id),
queue: {
running: 0,
error: 1,
error_message: String(error.message),
},
});
return;
}
await updateQueue({
queueId: _n(lastQueueItem.id),
queue: {
success: 1,
},
});
})();
await Bun.sleep(INTERVAL);
}
}