Files
brain/tools/secretary-queue.mjs
T

42 lines
2.1 KiB
JavaScript

// tools/secretary-queue.mjs
// Очередь спанов и курсор обработки темы на диске (<workDir>/_worker/). Дедуп по span.index.
// Курсор — наибольший ПОЛНОСТЬЮ обработанный индекс; пишет только воркер. Все записи атомарны.
import { readFileSync, mkdirSync } from 'node:fs';
import { join } from 'node:path';
import { writeFileAtomic } from './secretary-layer1.mjs';
function wdir(workDir) { const d = join(workDir, '_worker'); mkdirSync(d, { recursive: true }); return d; }
function qpath(workDir) { return join(wdir(workDir), 'queue.json'); }
function cpath(workDir) { return join(wdir(workDir), 'cursor.json'); }
function readQueue(workDir) {
try { const a = JSON.parse(readFileSync(qpath(workDir), 'utf-8')); return Array.isArray(a) ? a : []; }
catch { return []; }
}
function writeQueue(workDir, arr) { writeFileAtomic(qpath(workDir), JSON.stringify(arr)); }
/** Поставить спан в очередь. Дедуп по span.index — повторная постановка того же спана игнорируется. */
export function enqueueSpan(workDir, job) {
const arr = readQueue(workDir);
const idx = job && job.span ? job.span.index : undefined;
if (arr.some((j) => j.span && j.span.index === idx)) return;
arr.push(job);
writeQueue(workDir, arr);
}
/** Снять первый спан (FIFO). null если очередь пуста. */
export function dequeueSpan(workDir) {
const arr = readQueue(workDir);
if (!arr.length) return null;
const first = arr.shift();
writeQueue(workDir, arr);
return first;
}
/** Курсор обработки темы (наибольший обработанный индекс). -1 = ничего не обработано. */
export function readCursor(workDir) {
try { const v = JSON.parse(readFileSync(cpath(workDir), 'utf-8')); return Number.isFinite(v.index) ? v.index : -1; }
catch { return -1; }
}
export function writeCursor(workDir, index) { writeFileAtomic(cpath(workDir), JSON.stringify({ index })); }