editor revamp, complete change

This commit is contained in:
Anthony Stirling
2025-11-02 21:00:03 +00:00
parent ec0ae36a82
commit bbcb23ca11
25 changed files with 3747 additions and 1021 deletions

View File

@@ -148,17 +148,31 @@ public class JobExecutorService {
taskManager.createTask(jobId);
// Create a specialized wrapper that updates the TaskManager
final String capturedJobIdForQueue = jobId;
Supplier<Object> wrappedWork =
() -> {
try {
// Set jobId in ThreadLocal context for the queued job
stirling.software.common.util.JobContext.setJobId(
capturedJobIdForQueue);
log.debug(
"Set jobId {} in JobContext for queued job execution",
capturedJobIdForQueue);
Object result = work.get();
processJobResult(jobId, result);
processJobResult(capturedJobIdForQueue, result);
return result;
} catch (Exception e) {
log.error(
"Error executing queued job {}: {}", jobId, e.getMessage(), e);
taskManager.setError(jobId, e.getMessage());
"Error executing queued job {}: {}",
capturedJobIdForQueue,
e.getMessage(),
e);
taskManager.setError(capturedJobIdForQueue, e.getMessage());
throw e;
} finally {
// Clean up ThreadLocal to avoid memory leaks
stirling.software.common.util.JobContext.clear();
}
};
@@ -170,21 +184,36 @@ public class JobExecutorService {
return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null));
} else if (async) {
taskManager.createTask(jobId);
// Capture the jobId for the async thread
final String capturedJobId = jobId;
executor.execute(
() -> {
try {
log.debug(
"Running async job {} with timeout {} ms", jobId, timeoutToUse);
"Running async job {} with timeout {} ms",
capturedJobId,
timeoutToUse);
// Set jobId in ThreadLocal context for the async thread
stirling.software.common.util.JobContext.setJobId(capturedJobId);
log.debug(
"Set jobId {} in JobContext for async execution",
capturedJobId);
// Execute with timeout
Object result = executeWithTimeout(() -> work.get(), timeoutToUse);
processJobResult(jobId, result);
processJobResult(capturedJobId, result);
} catch (TimeoutException te) {
log.error("Job {} timed out after {} ms", jobId, timeoutToUse);
taskManager.setError(jobId, "Job timed out");
} catch (Exception e) {
log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
taskManager.setError(jobId, e.getMessage());
} finally {
// Clean up ThreadLocal to avoid memory leaks
stirling.software.common.util.JobContext.clear();
}
});
@@ -193,6 +222,10 @@ public class JobExecutorService {
try {
log.debug("Running sync job with timeout {} ms", timeoutToUse);
// Make jobId available to downstream components on the worker thread
stirling.software.common.util.JobContext.setJobId(jobId);
log.debug("Set jobId {} in JobContext for sync execution", jobId);
// Execute with timeout
Object result = executeWithTimeout(() -> work.get(), timeoutToUse);
@@ -212,6 +245,8 @@ public class JobExecutorService {
// Construct a JSON error response
return ResponseEntity.internalServerError()
.body(Map.of("error", "Job failed: " + e.getMessage()));
} finally {
stirling.software.common.util.JobContext.clear();
}
}
}
@@ -456,8 +491,23 @@ public class JobExecutorService {
throws TimeoutException, Exception {
// Use the same executor as other async jobs for consistency
// This ensures all operations run on the same thread pool
String currentJobId = stirling.software.common.util.JobContext.getJobId();
java.util.concurrent.CompletableFuture<T> future =
java.util.concurrent.CompletableFuture.supplyAsync(supplier, executor);
java.util.concurrent.CompletableFuture.supplyAsync(
() -> {
if (currentJobId != null) {
stirling.software.common.util.JobContext.setJobId(currentJobId);
}
try {
return supplier.get();
} finally {
if (currentJobId != null) {
stirling.software.common.util.JobContext.clear();
}
}
},
executor);
try {
return future.get(timeoutMs, TimeUnit.MILLISECONDS);

View File

@@ -0,0 +1,18 @@
package stirling.software.common.util;
/** Thread-local context for passing job ID across async boundaries */
public class JobContext {
private static final ThreadLocal<String> CURRENT_JOB_ID = new ThreadLocal<>();
public static void setJobId(String jobId) {
CURRENT_JOB_ID.set(jobId);
}
public static String getJobId() {
return CURRENT_JOB_ID.get();
}
public static void clear() {
CURRENT_JOB_ID.remove();
}
}

View File

@@ -94,6 +94,7 @@ public class ProcessExecutor {
.getProcessExecutor()
.getSessionLimit()
.getOcrMyPdfSessionLimit();
case CFF_CONVERTER -> 1;
};
long timeoutMinutes =
@@ -148,6 +149,7 @@ public class ProcessExecutor {
.getProcessExecutor()
.getTimeoutMinutes()
.getOcrMyPdfTimeoutMinutes();
case CFF_CONVERTER -> 5L;
};
return new ProcessExecutor(semaphoreLimit, liveUpdates, timeoutMinutes);
});
@@ -300,7 +302,8 @@ public class ProcessExecutor {
TESSERACT,
QPDF,
GHOSTSCRIPT,
OCR_MY_PDF
OCR_MY_PDF,
CFF_CONVERTER
}
public class ProcessExecutorResult {

View File

@@ -78,6 +78,23 @@ class JobExecutorServiceTest {
verify(request).setAttribute(eq("jobId"), anyString());
}
@Test
void shouldExposeJobIdInJobContextDuringSyncExecution() throws Exception {
// Given
Supplier<Object> work = stirling.software.common.util.JobContext::getJobId;
// When
ResponseEntity<?> response = jobExecutorService.runJobGeneric(false, work);
// Then
assertEquals(HttpStatus.OK, response.getStatusCode());
assertNotNull(response.getBody());
var requestJobIdCaptor = ArgumentCaptor.forClass(String.class);
verify(request).setAttribute(eq("jobId"), requestJobIdCaptor.capture());
assertEquals(requestJobIdCaptor.getValue(), response.getBody());
}
@Test
void shouldRunAsyncJobSuccessfully() throws Exception {
// Given