From be336f09deb4e28e9bcff87763070bc30580b2d6 Mon Sep 17 00:00:00 2001 From: Felix Kaspar Date: Thu, 19 Oct 2023 21:23:58 +0200 Subject: [PATCH] api event-stream --- routes/api/workflow.js | 68 ++++++++++++++++++++++++++++++++++-------- traverseOperations.js | 24 ++++++++++----- 2 files changed, 72 insertions(+), 20 deletions(-) diff --git a/routes/api/workflow.js b/routes/api/workflow.js index 9f6c0190..cef23375 100644 --- a/routes/api/workflow.js +++ b/routes/api/workflow.js @@ -9,7 +9,7 @@ const activeWorkflows = {}; const router = express.Router(); -router.post("/", [ +router.post("/:workflowUuid?", [ multer().array("files"), async (req, res, next) => { const workflow = JSON.parse(req.body.workflow); @@ -18,10 +18,7 @@ router.post("/", [ // TODO: Validate - let workflowID = undefined; - const inputs = await Promise.all(req.files.map(async file => { - console.log(file); return { originalFileName: file.originalname.replace(/\.[^/.]+$/, ""), fileName: file.originalname.replace(/\.[^/.]+$/, ""), @@ -33,11 +30,26 @@ router.post("/", [ if(req.body.async === "false") { console.log("Don't do async"); - const pdfResults = await traverseOperations(workflow.operations, inputs); + const traverse = traverseOperations(workflow.operations, inputs); + + let pdfResults; + let iteration; + while (true) { + iteration = await traverse.next(); + if (iteration.done) { + pdfResults = iteration.value; + break; + } + } + downloadHandler(res, pdfResults); } else { - workflowID = generateWorkflowID(); + // TODO: UUID collision checks + let workflowID = req.params.workflowUuid + if(!workflowID) + workflowID = generateWorkflowID(); + activeWorkflows[workflowID] = { createdAt: Date.now(), finished: false, @@ -45,6 +57,7 @@ router.post("/", [ result: null, // TODO: When auth is implemented: owner } + const activeWorkflow = activeWorkflows[workflowID]; res.status(501).json({ "warning": "Unfinished Endpoint", @@ -55,11 +68,26 @@ router.post("/", [ } }); - traverseOperations(workflow.operations, inputs).then((pdfResults) => { - activeWorkflows[workflowID].result = pdfResults; - activeWorkflows[workflowID].finished = true; - // TODO: Post to eventStream - }); + const traverse = traverseOperations(workflow.operations, inputs); + + let pdfResults; + let iteration; + while (true) { + iteration = await traverse.next(); + if (iteration.done) { + pdfResults = iteration.value; + if(activeWorkflow.eventStream) { + activeWorkflow.eventStream.write(`data: processing done`); + activeWorkflow.eventStream.end(); + } + break; + } + if(activeWorkflow.eventStream) + activeWorkflow.eventStream.write(`data: ${iteration.value}\n\n`); + } + + activeWorkflow.result = pdfResults; + activeWorkflow.finished = true; } } ]); @@ -73,8 +101,22 @@ router.get("/progress/:workflowUuid", (req, res, nex) => { }); router.get("/progress-stream/:workflowUuid", (req, res, nex) => { - // TODO: Send realtime updates - res.status(501).json({"warning": "Event-Stream has not been implemented yet."}); + // TODO: Validation + + // Send realtime updates + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Connection', 'keep-alive'); + res.flushHeaders(); // flush the headers to establish SSE with client + + const workflow = activeWorkflows[req.params.workflowUuid]; + workflow.eventStream = res; + + res.on('close', () => { + res.end(); + // TODO: Abort if not already done? + }); }); router.get("/result/:workflowUuid", (req, res, nex) => { diff --git a/traverseOperations.js b/traverseOperations.js index d43ddf13..818e8758 100644 --- a/traverseOperations.js +++ b/traverseOperations.js @@ -5,13 +5,16 @@ import { rotatePages } from "./functions/rotatePDF.js"; import { splitPDF } from "./functions/splitPDF.js"; import { organizeWaitOperations } from "./public/organizeWaitOperations.js"; -export async function traverseOperations(operations, input) { +export async function * traverseOperations(operations, input) { const waitOperations = organizeWaitOperations(operations); const results = []; - await nextOperation(operations, input); + for await (const value of nextOperation(operations, input)) { + yield value; + } return results; - async function nextOperation(operations, input) { + // TODO: Pult all nextOperation() in the for await, like for "extract" + async function * nextOperation(operations, input) { if(Array.isArray(operations) && operations.length == 0) { // isEmpty console.log("operation done: " + input.fileName); results.push(input); @@ -19,11 +22,13 @@ export async function traverseOperations(operations, input) { } for (let i = 0; i < operations.length; i++) { - await computeOperation(operations[i], structuredClone(input)); // break references + for await (const value of computeOperation(operations[i], structuredClone(input))) { + yield value; + } } } - async function computeOperation(operation, input) { + async function * computeOperation(operation, input) { switch (operation.type) { case "done": console.log("Done operation will get called if all waits are done. Skipping for now.") @@ -57,13 +62,17 @@ export async function traverseOperations(operations, input) { for (let i = 0; i < input.length; i++) { input[i].fileName += "_extractedPages"; input[i].buffer = await extractPages(input[i].buffer, operation.values["pagesToExtractArray"]); - await nextOperation(operation.operations, input[i]); + for await (const value of nextOperation(operation.operations, input[i])) { + yield value; + } } } else { input.fileName += "_extractedPages"; input.buffer = await extractPages(input.buffer, operation.values["pagesToExtractArray"]); - await nextOperation(operation.operations, input); + for await (const value of nextOperation(operation.operations, input)) { + yield value; + } } break; case "split": @@ -202,5 +211,6 @@ export async function traverseOperations(operations, input) { console.log("operation type unknown: ", operation.type); break; } + yield operation.type; } } \ No newline at end of file