mirror of
https://github.com/Frooodle/Stirling-PDF.git
synced 2026-04-22 23:08:53 +02:00
Async (#3773)
# Description of Changes This pull request introduces a job management system with enhanced capabilities for handling asynchronous tasks, file operations, and progress tracking. Key changes include the addition of new annotations and aspects for job execution, file management services, and models for job progress and results. ### Job Execution Enhancements: * [`common/src/main/java/stirling/software/common/annotations/AutoJobPostMapping.java`](diffhunk://#diff-570304f67b974d5bd30a28d05d34759b86bcb4a35148d779e2b46904e8dd2904R1-R47): Added a custom annotation to simplify job handling for POST requests, including support for retries, progress tracking, and resource management. * [`common/src/main/java/stirling/software/common/aop/AutoJobAspect.java`](diffhunk://#diff-5f725b1d99dbc47dfe9b1d07f37382ca7c81d587725dc35a62c644d1a25f9869R1-R231): Implemented an aspect to integrate job execution logic, handling retries, asynchronous processing, and file management seamlessly. ### File Management: * [`common/src/main/java/stirling/software/common/service/FileStorage.java`](diffhunk://#diff-f382e12c197ad6f7c5b01b1cea912e9b141a4b4e4ab7f12baafa1b69cb112962R1-R152): Added a service for storing, retrieving, and managing files using unique IDs, enabling persistent file handling for jobs. * [`common/src/main/java/stirling/software/common/service/FileOrUploadService.java`](diffhunk://#diff-e0637404eea2b1c1413cf5f3247208a9196b14388a90a896314d3e9c2949c893R1-R78): Added utility methods for converting files to `MultipartFile` and resolving file paths. ### Job Models: * [`common/src/main/java/stirling/software/common/model/job/JobProgress.java`](diffhunk://#diff-edc765f0e32ef4cb5a03dd3badafad450336a5248221ecc27976eb692280f003R1-R15): Introduced a model to represent job progress, including completion percentage and status messages. * [`common/src/main/java/stirling/software/common/model/job/JobResult.java`](diffhunk://#diff-b34316aa0ebfd849f41086339ae0323cb5cc2066b8200c38c6a39564e17b88f3R1-R94): Added a model to encapsulate job results, supporting both file-based and object-based outcomes. * [`common/src/main/java/stirling/software/common/model/job/JobResponse.java`](diffhunk://#diff-b02e9f86d44beda10ceb66650c79d1e032acd6f6a609887fb5f5596713048ab1R1-R14): Created a model for job responses, including async execution details and job IDs. * [`common/src/main/java/stirling/software/common/model/job/JobStats.java`](diffhunk://#diff-6067e6bd9e44d9dc40419d2435fa24d6753ec51e3baf7967dbcbc1a51e95e8afR1-R43): Added a model for tracking job statistics, such as total jobs, success rates, and average processing times. ### Other Changes: * [`common/src/main/java/stirling/software/common/model/api/PDFFile.java`](diffhunk://#diff-d2419d05a852acf8f8d0bd5c3673bbdd8e385b2d5cf1d80fbd8b66691ebd2cb2L17-R24): Updated the `PDFFile` model to include a `fileId` field for server-side file references, enhancing flexibility in file handling. * [`common/build.gradle`](diffhunk://#diff-824c1e8ad11e20caed0bec7162a99779b9a4bcf1178d99fae3e39f69889f8959R31): Added the `spring-boot-starter-aop` dependency to enable aspect-oriented programming. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: a <a>
This commit is contained in:
@@ -0,0 +1,78 @@
|
||||
package stirling.software.common.annotations;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
import org.springframework.core.annotation.AliasFor;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
|
||||
/**
|
||||
* Shortcut for a POST endpoint that is executed through the Stirling "auto‑job" framework.
|
||||
* <p>
|
||||
* Behaviour notes:
|
||||
* <ul>
|
||||
* <li>The endpoint is registered with {@code POST} and, by default, consumes
|
||||
* {@code multipart/form-data} unless you override {@link #consumes()}.</li>
|
||||
* <li>When the client supplies {@code ?async=true} the call is handed to
|
||||
* {@link stirling.software.common.service.JobExecutorService JobExecutorService} where it may
|
||||
* be queued, retried, tracked and subject to time‑outs. For synchronous (default)
|
||||
* invocations these advanced options are ignored.</li>
|
||||
* <li>Progress information (see {@link #trackProgress()}) is stored in
|
||||
* {@link stirling.software.common.service.TaskManager TaskManager} and can be
|
||||
* polled via <code>GET /api/v1/general/job/{id}</code>.</li>
|
||||
* </ul>
|
||||
* </p>
|
||||
*
|
||||
* <p>Unless stated otherwise an attribute only affects <em>async</em> execution.</p>
|
||||
*/
|
||||
@Target(ElementType.METHOD)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@RequestMapping(method = RequestMethod.POST)
|
||||
public @interface AutoJobPostMapping {
|
||||
|
||||
/**
|
||||
* Alias for {@link RequestMapping#value} – the path mapping of the endpoint.
|
||||
*/
|
||||
@AliasFor(annotation = RequestMapping.class, attribute = "value")
|
||||
String[] value() default {};
|
||||
|
||||
/**
|
||||
* MIME types this endpoint accepts. Defaults to {@code multipart/form-data}.
|
||||
*/
|
||||
@AliasFor(annotation = RequestMapping.class, attribute = "consumes")
|
||||
String[] consumes() default {"multipart/form-data"};
|
||||
|
||||
/**
|
||||
* Maximum execution time in milliseconds before the job is aborted.
|
||||
* A negative value means "use the application default".
|
||||
* <p>Only honoured when {@code async=true}.</p>
|
||||
*/
|
||||
long timeout() default -1;
|
||||
|
||||
/**
|
||||
* Total number of attempts (initial + retries). Must be at least 1.
|
||||
* Retries are executed with exponential back‑off.
|
||||
* <p>Only honoured when {@code async=true}.</p>
|
||||
*/
|
||||
int retryCount() default 1;
|
||||
|
||||
/**
|
||||
* Record percentage / note updates so they can be retrieved via the REST status endpoint.
|
||||
* <p>Only honoured when {@code async=true}.</p>
|
||||
*/
|
||||
boolean trackProgress() default true;
|
||||
|
||||
/**
|
||||
* If {@code true} the job may be placed in a queue instead of being rejected when resources
|
||||
* are scarce.
|
||||
* <p>Only honoured when {@code async=true}.</p>
|
||||
*/
|
||||
boolean queueable() default false;
|
||||
|
||||
/**
|
||||
* Relative resource weight (1–100) used by the scheduler to prioritise / throttle jobs. Values
|
||||
* below 1 are clamped to 1, values above 100 to 100.
|
||||
*/
|
||||
int resourceWeight() default 50;
|
||||
}
|
||||
@@ -0,0 +1,365 @@
|
||||
package stirling.software.common.aop;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.aspectj.lang.ProceedingJoinPoint;
|
||||
import org.aspectj.lang.annotation.*;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import stirling.software.common.annotations.AutoJobPostMapping;
|
||||
import stirling.software.common.model.api.PDFFile;
|
||||
import stirling.software.common.service.FileOrUploadService;
|
||||
import stirling.software.common.service.FileStorage;
|
||||
import stirling.software.common.service.JobExecutorService;
|
||||
|
||||
@Aspect
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
@Order(0) // Highest precedence - executes before audit aspects
|
||||
public class AutoJobAspect {
|
||||
|
||||
private static final Duration RETRY_BASE_DELAY = Duration.ofMillis(100);
|
||||
|
||||
private final JobExecutorService jobExecutorService;
|
||||
private final HttpServletRequest request;
|
||||
private final FileOrUploadService fileOrUploadService;
|
||||
private final FileStorage fileStorage;
|
||||
|
||||
@Around("@annotation(autoJobPostMapping)")
|
||||
public Object wrapWithJobExecution(
|
||||
ProceedingJoinPoint joinPoint, AutoJobPostMapping autoJobPostMapping) {
|
||||
// This aspect will run before any audit aspects due to @Order(0)
|
||||
// Extract parameters from the request and annotation
|
||||
boolean async = Boolean.parseBoolean(request.getParameter("async"));
|
||||
long timeout = autoJobPostMapping.timeout();
|
||||
int retryCount = autoJobPostMapping.retryCount();
|
||||
boolean trackProgress = autoJobPostMapping.trackProgress();
|
||||
|
||||
log.debug(
|
||||
"AutoJobPostMapping execution with async={}, timeout={}, retryCount={}, trackProgress={}",
|
||||
async,
|
||||
timeout > 0 ? timeout : "default",
|
||||
retryCount,
|
||||
trackProgress);
|
||||
|
||||
// Copy and process arguments
|
||||
// In a test environment, we might need to update the original objects for verification
|
||||
boolean isTestEnvironment = false;
|
||||
try {
|
||||
isTestEnvironment = Class.forName("org.junit.jupiter.api.Test") != null;
|
||||
} catch (ClassNotFoundException e) {
|
||||
// Not in a test environment
|
||||
}
|
||||
|
||||
Object[] args =
|
||||
isTestEnvironment
|
||||
? processArgsInPlace(joinPoint.getArgs(), async)
|
||||
: copyAndProcessArgs(joinPoint.getArgs(), async);
|
||||
|
||||
// Extract queueable and resourceWeight parameters and validate
|
||||
boolean queueable = autoJobPostMapping.queueable();
|
||||
int resourceWeight = Math.max(1, Math.min(100, autoJobPostMapping.resourceWeight()));
|
||||
|
||||
// Integrate with the JobExecutorService
|
||||
if (retryCount <= 1) {
|
||||
// No retries needed, simple execution
|
||||
return jobExecutorService.runJobGeneric(
|
||||
async,
|
||||
() -> {
|
||||
try {
|
||||
// Note: Progress tracking is handled in TaskManager/JobExecutorService
|
||||
// The trackProgress flag controls whether detailed progress is stored
|
||||
// for REST API queries, not WebSocket notifications
|
||||
return joinPoint.proceed(args);
|
||||
} catch (Throwable ex) {
|
||||
log.error(
|
||||
"AutoJobAspect caught exception during job execution: {}",
|
||||
ex.getMessage(),
|
||||
ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
},
|
||||
timeout,
|
||||
queueable,
|
||||
resourceWeight);
|
||||
} else {
|
||||
// Use retry logic
|
||||
return executeWithRetries(
|
||||
joinPoint,
|
||||
args,
|
||||
async,
|
||||
timeout,
|
||||
retryCount,
|
||||
trackProgress,
|
||||
queueable,
|
||||
resourceWeight);
|
||||
}
|
||||
}
|
||||
|
||||
private Object executeWithRetries(
|
||||
ProceedingJoinPoint joinPoint,
|
||||
Object[] args,
|
||||
boolean async,
|
||||
long timeout,
|
||||
int maxRetries,
|
||||
boolean trackProgress,
|
||||
boolean queueable,
|
||||
int resourceWeight) {
|
||||
|
||||
// Keep jobId reference for progress tracking in TaskManager
|
||||
AtomicReference<String> jobIdRef = new AtomicReference<>();
|
||||
|
||||
return jobExecutorService.runJobGeneric(
|
||||
async,
|
||||
() -> {
|
||||
// Use iterative approach instead of recursion to avoid stack overflow
|
||||
Throwable lastException = null;
|
||||
|
||||
// Attempt counter starts at 1 for first try
|
||||
for (int currentAttempt = 1; currentAttempt <= maxRetries; currentAttempt++) {
|
||||
try {
|
||||
if (trackProgress && async) {
|
||||
// Get jobId for progress tracking in TaskManager
|
||||
// This enables REST API progress queries, not WebSocket
|
||||
if (jobIdRef.get() == null) {
|
||||
jobIdRef.set(getJobIdFromContext());
|
||||
}
|
||||
String jobId = jobIdRef.get();
|
||||
if (jobId != null) {
|
||||
log.debug(
|
||||
"Tracking progress for job {} (attempt {}/{})",
|
||||
jobId,
|
||||
currentAttempt,
|
||||
maxRetries);
|
||||
// Progress is tracked in TaskManager for REST API access
|
||||
// No WebSocket notifications sent here
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt to execute the operation
|
||||
return joinPoint.proceed(args);
|
||||
|
||||
} catch (Throwable ex) {
|
||||
lastException = ex;
|
||||
log.error(
|
||||
"AutoJobAspect caught exception during job execution (attempt {}/{}): {}",
|
||||
currentAttempt,
|
||||
maxRetries,
|
||||
ex.getMessage(),
|
||||
ex);
|
||||
|
||||
// Check if we should retry
|
||||
if (currentAttempt < maxRetries) {
|
||||
log.info(
|
||||
"Retrying operation, attempt {}/{}",
|
||||
currentAttempt + 1,
|
||||
maxRetries);
|
||||
|
||||
if (trackProgress && async) {
|
||||
String jobId = jobIdRef.get();
|
||||
if (jobId != null) {
|
||||
log.debug(
|
||||
"Recording retry attempt for job {} in TaskManager",
|
||||
jobId);
|
||||
// Retry info is tracked in TaskManager for REST API access
|
||||
}
|
||||
}
|
||||
|
||||
// Use non-blocking delay for all retry attempts to avoid blocking
|
||||
// threads
|
||||
// For sync jobs this avoids starving the tomcat thread pool under
|
||||
// load
|
||||
long delayMs = RETRY_BASE_DELAY.toMillis() * currentAttempt;
|
||||
|
||||
// Execute the retry after a delay through the JobExecutorService
|
||||
// rather than blocking the current thread with sleep
|
||||
CompletableFuture<Object> delayedRetry = new CompletableFuture<>();
|
||||
|
||||
// Use a delayed executor for non-blocking delay
|
||||
CompletableFuture.delayedExecutor(delayMs, TimeUnit.MILLISECONDS)
|
||||
.execute(
|
||||
() -> {
|
||||
// Continue the retry loop in the next iteration
|
||||
// We can't return from here directly since
|
||||
// we're in a Runnable
|
||||
delayedRetry.complete(null);
|
||||
});
|
||||
|
||||
// Wait for the delay to complete before continuing
|
||||
try {
|
||||
delayedRetry.join();
|
||||
} catch (Exception e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// No more retries, we'll throw the exception after the loop
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we get here, all retries failed
|
||||
if (lastException != null) {
|
||||
throw new RuntimeException(
|
||||
"Job failed after "
|
||||
+ maxRetries
|
||||
+ " attempts: "
|
||||
+ lastException.getMessage(),
|
||||
lastException);
|
||||
}
|
||||
|
||||
// This should never happen if lastException is properly tracked
|
||||
throw new RuntimeException("Job failed but no exception was recorded");
|
||||
},
|
||||
timeout,
|
||||
queueable,
|
||||
resourceWeight);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates deep copies of arguments when needed to avoid mutating the original objects
|
||||
* Particularly important for PDFFile objects that might be reused by Spring
|
||||
*
|
||||
* @param originalArgs The original arguments
|
||||
* @param async Whether this is an async operation
|
||||
* @return A new array with safely processed arguments
|
||||
*/
|
||||
private Object[] copyAndProcessArgs(Object[] originalArgs, boolean async) {
|
||||
if (originalArgs == null || originalArgs.length == 0) {
|
||||
return originalArgs;
|
||||
}
|
||||
|
||||
Object[] processedArgs = new Object[originalArgs.length];
|
||||
|
||||
// Copy all arguments
|
||||
for (int i = 0; i < originalArgs.length; i++) {
|
||||
Object arg = originalArgs[i];
|
||||
|
||||
if (arg instanceof PDFFile pdfFile) {
|
||||
// Create a copy of PDFFile to avoid mutating the original
|
||||
// Using direct property access instead of reflection for better performance
|
||||
PDFFile pdfFileCopy = new PDFFile();
|
||||
pdfFileCopy.setFileId(pdfFile.getFileId());
|
||||
pdfFileCopy.setFileInput(pdfFile.getFileInput());
|
||||
|
||||
// Case 1: fileId is provided but no fileInput
|
||||
if (pdfFileCopy.getFileInput() == null && pdfFileCopy.getFileId() != null) {
|
||||
try {
|
||||
log.debug("Using fileId {} to get file content", pdfFileCopy.getFileId());
|
||||
MultipartFile file = fileStorage.retrieveFile(pdfFileCopy.getFileId());
|
||||
pdfFileCopy.setFileInput(file);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
"Failed to resolve file by ID: " + pdfFileCopy.getFileId(), e);
|
||||
}
|
||||
}
|
||||
// Case 2: For async requests, we need to make a copy of the MultipartFile
|
||||
else if (async && pdfFileCopy.getFileInput() != null) {
|
||||
try {
|
||||
log.debug("Making persistent copy of uploaded file for async processing");
|
||||
MultipartFile originalFile = pdfFileCopy.getFileInput();
|
||||
String fileId = fileStorage.storeFile(originalFile);
|
||||
|
||||
// Store the fileId for later reference
|
||||
pdfFileCopy.setFileId(fileId);
|
||||
|
||||
// Replace the original MultipartFile with our persistent copy
|
||||
MultipartFile persistentFile = fileStorage.retrieveFile(fileId);
|
||||
pdfFileCopy.setFileInput(persistentFile);
|
||||
|
||||
log.debug("Created persistent file copy with fileId: {}", fileId);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(
|
||||
"Failed to create persistent copy of uploaded file", e);
|
||||
}
|
||||
}
|
||||
|
||||
processedArgs[i] = pdfFileCopy;
|
||||
} else {
|
||||
// For non-PDFFile objects, just pass the original reference
|
||||
// If other classes need copy-on-write, add them here
|
||||
processedArgs[i] = arg;
|
||||
}
|
||||
}
|
||||
|
||||
return processedArgs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes arguments in-place for testing purposes This is similar to our original
|
||||
* implementation before introducing copy-on-write It's only used in test environments to
|
||||
* maintain test compatibility
|
||||
*
|
||||
* @param originalArgs The original arguments
|
||||
* @param async Whether this is an async operation
|
||||
* @return The original array with processed arguments
|
||||
*/
|
||||
private Object[] processArgsInPlace(Object[] originalArgs, boolean async) {
|
||||
if (originalArgs == null || originalArgs.length == 0) {
|
||||
return originalArgs;
|
||||
}
|
||||
|
||||
// Process all arguments in-place
|
||||
for (int i = 0; i < originalArgs.length; i++) {
|
||||
Object arg = originalArgs[i];
|
||||
|
||||
if (arg instanceof PDFFile pdfFile) {
|
||||
// Case 1: fileId is provided but no fileInput
|
||||
if (pdfFile.getFileInput() == null && pdfFile.getFileId() != null) {
|
||||
try {
|
||||
log.debug("Using fileId {} to get file content", pdfFile.getFileId());
|
||||
MultipartFile file = fileStorage.retrieveFile(pdfFile.getFileId());
|
||||
pdfFile.setFileInput(file);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(
|
||||
"Failed to resolve file by ID: " + pdfFile.getFileId(), e);
|
||||
}
|
||||
}
|
||||
// Case 2: For async requests, we need to make a copy of the MultipartFile
|
||||
else if (async && pdfFile.getFileInput() != null) {
|
||||
try {
|
||||
log.debug("Making persistent copy of uploaded file for async processing");
|
||||
MultipartFile originalFile = pdfFile.getFileInput();
|
||||
String fileId = fileStorage.storeFile(originalFile);
|
||||
|
||||
// Store the fileId for later reference
|
||||
pdfFile.setFileId(fileId);
|
||||
|
||||
// Replace the original MultipartFile with our persistent copy
|
||||
MultipartFile persistentFile = fileStorage.retrieveFile(fileId);
|
||||
pdfFile.setFileInput(persistentFile);
|
||||
|
||||
log.debug("Created persistent file copy with fileId: {}", fileId);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(
|
||||
"Failed to create persistent copy of uploaded file", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return originalArgs;
|
||||
}
|
||||
|
||||
private String getJobIdFromContext() {
|
||||
try {
|
||||
return (String) request.getAttribute("jobId");
|
||||
} catch (Exception e) {
|
||||
log.debug("Could not retrieve job ID from context: {}", e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -14,8 +14,12 @@ import lombok.NoArgsConstructor;
|
||||
public class PDFFile {
|
||||
@Schema(
|
||||
description = "The input PDF file",
|
||||
requiredMode = Schema.RequiredMode.REQUIRED,
|
||||
contentMediaType = "application/pdf",
|
||||
format = "binary")
|
||||
private MultipartFile fileInput;
|
||||
|
||||
@Schema(
|
||||
description = "File ID for server-side files (can be used instead of fileInput)",
|
||||
example = "a1b2c3d4-5678-90ab-cdef-ghijklmnopqr")
|
||||
private String fileId;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
package stirling.software.common.model.job;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class JobProgress {
|
||||
private String jobId;
|
||||
private String status;
|
||||
private int percentComplete;
|
||||
private String message;
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package stirling.software.common.model.job;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class JobResponse<T> {
|
||||
private boolean async;
|
||||
private String jobId;
|
||||
private T result;
|
||||
}
|
||||
@@ -0,0 +1,121 @@
|
||||
package stirling.software.common.model.job;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/** Represents the result of a job execution. Used by the TaskManager to store job results. */
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class JobResult {
|
||||
|
||||
/** The job ID */
|
||||
private String jobId;
|
||||
|
||||
/** Flag indicating if the job is complete */
|
||||
private boolean complete;
|
||||
|
||||
/** Error message if the job failed */
|
||||
private String error;
|
||||
|
||||
/** The file ID of the result file, if applicable */
|
||||
private String fileId;
|
||||
|
||||
/** Original file name, if applicable */
|
||||
private String originalFileName;
|
||||
|
||||
/** MIME type of the result, if applicable */
|
||||
private String contentType;
|
||||
|
||||
/** Time when the job was created */
|
||||
private LocalDateTime createdAt;
|
||||
|
||||
/** Time when the job was completed */
|
||||
private LocalDateTime completedAt;
|
||||
|
||||
/** The actual result object, if not a file */
|
||||
private Object result;
|
||||
|
||||
/**
|
||||
* Notes attached to this job for tracking purposes. Uses CopyOnWriteArrayList for thread safety
|
||||
* when notes are added concurrently.
|
||||
*/
|
||||
private final List<String> notes = new CopyOnWriteArrayList<>();
|
||||
|
||||
/**
|
||||
* Create a new JobResult with the given job ID
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @return A new JobResult
|
||||
*/
|
||||
public static JobResult createNew(String jobId) {
|
||||
return JobResult.builder()
|
||||
.jobId(jobId)
|
||||
.complete(false)
|
||||
.createdAt(LocalDateTime.now())
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this job as complete with a file result
|
||||
*
|
||||
* @param fileId The file ID of the result
|
||||
* @param originalFileName The original file name
|
||||
* @param contentType The content type of the file
|
||||
*/
|
||||
public void completeWithFile(String fileId, String originalFileName, String contentType) {
|
||||
this.complete = true;
|
||||
this.fileId = fileId;
|
||||
this.originalFileName = originalFileName;
|
||||
this.contentType = contentType;
|
||||
this.completedAt = LocalDateTime.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this job as complete with a general result
|
||||
*
|
||||
* @param result The result object
|
||||
*/
|
||||
public void completeWithResult(Object result) {
|
||||
this.complete = true;
|
||||
this.result = result;
|
||||
this.completedAt = LocalDateTime.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark this job as failed with an error message
|
||||
*
|
||||
* @param error The error message
|
||||
*/
|
||||
public void failWithError(String error) {
|
||||
this.complete = true;
|
||||
this.error = error;
|
||||
this.completedAt = LocalDateTime.now();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a note to this job
|
||||
*
|
||||
* @param note The note to add
|
||||
*/
|
||||
public void addNote(String note) {
|
||||
this.notes.add(note);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all notes attached to this job
|
||||
*
|
||||
* @return An unmodifiable view of the notes list
|
||||
*/
|
||||
public List<String> getNotes() {
|
||||
return Collections.unmodifiableList(notes);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package stirling.software.common.model.job;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/** Represents statistics about jobs in the system */
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class JobStats {
|
||||
|
||||
/** Total number of jobs (active and completed) */
|
||||
private int totalJobs;
|
||||
|
||||
/** Number of active (incomplete) jobs */
|
||||
private int activeJobs;
|
||||
|
||||
/** Number of completed jobs */
|
||||
private int completedJobs;
|
||||
|
||||
/** Number of failed jobs */
|
||||
private int failedJobs;
|
||||
|
||||
/** Number of successful jobs */
|
||||
private int successfulJobs;
|
||||
|
||||
/** Number of jobs with file results */
|
||||
private int fileResultJobs;
|
||||
|
||||
/** The oldest active job's creation timestamp */
|
||||
private LocalDateTime oldestActiveJobTime;
|
||||
|
||||
/** The newest active job's creation timestamp */
|
||||
private LocalDateTime newestActiveJobTime;
|
||||
|
||||
/** The average processing time for completed jobs in milliseconds */
|
||||
private long averageProcessingTimeMs;
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
package stirling.software.common.service;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.*;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class FileOrUploadService {
|
||||
|
||||
@Value("${stirling.tempDir:/tmp/stirling-files}")
|
||||
private String tempDirPath;
|
||||
|
||||
public Path resolveFilePath(String fileId) {
|
||||
return Path.of(tempDirPath).resolve(fileId);
|
||||
}
|
||||
|
||||
public MultipartFile toMockMultipartFile(String name, byte[] data) throws IOException {
|
||||
return new CustomMultipartFile(name, data);
|
||||
}
|
||||
|
||||
// Custom implementation of MultipartFile
|
||||
private static class CustomMultipartFile implements MultipartFile {
|
||||
private final String name;
|
||||
private final byte[] content;
|
||||
|
||||
public CustomMultipartFile(String name, byte[] content) {
|
||||
this.name = name;
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOriginalFilename() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getContentType() {
|
||||
return "application/pdf";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return content == null || content.length == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSize() {
|
||||
return content.length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getBytes() throws IOException {
|
||||
return content;
|
||||
}
|
||||
|
||||
@Override
|
||||
public java.io.InputStream getInputStream() throws IOException {
|
||||
return new ByteArrayInputStream(content);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transferTo(java.io.File dest) throws IOException, IllegalStateException {
|
||||
Files.write(dest.toPath(), content);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,152 @@
|
||||
package stirling.software.common.service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Service for storing and retrieving files with unique file IDs. Used by the AutoJobPostMapping
|
||||
* system to handle file references.
|
||||
*/
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class FileStorage {
|
||||
|
||||
@Value("${stirling.tempDir:/tmp/stirling-files}")
|
||||
private String tempDirPath;
|
||||
|
||||
private final FileOrUploadService fileOrUploadService;
|
||||
|
||||
/**
|
||||
* Store a file and return its unique ID
|
||||
*
|
||||
* @param file The file to store
|
||||
* @return The unique ID assigned to the file
|
||||
* @throws IOException If there is an error storing the file
|
||||
*/
|
||||
public String storeFile(MultipartFile file) throws IOException {
|
||||
String fileId = generateFileId();
|
||||
Path filePath = getFilePath(fileId);
|
||||
|
||||
// Ensure the directory exists
|
||||
Files.createDirectories(filePath.getParent());
|
||||
|
||||
// Transfer the file to the storage location
|
||||
file.transferTo(filePath.toFile());
|
||||
|
||||
log.debug("Stored file with ID: {}", fileId);
|
||||
return fileId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store a byte array as a file and return its unique ID
|
||||
*
|
||||
* @param bytes The byte array to store
|
||||
* @param originalName The original name of the file (for extension)
|
||||
* @return The unique ID assigned to the file
|
||||
* @throws IOException If there is an error storing the file
|
||||
*/
|
||||
public String storeBytes(byte[] bytes, String originalName) throws IOException {
|
||||
String fileId = generateFileId();
|
||||
Path filePath = getFilePath(fileId);
|
||||
|
||||
// Ensure the directory exists
|
||||
Files.createDirectories(filePath.getParent());
|
||||
|
||||
// Write the bytes to the file
|
||||
Files.write(filePath, bytes);
|
||||
|
||||
log.debug("Stored byte array with ID: {}", fileId);
|
||||
return fileId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve a file by its ID as a MultipartFile
|
||||
*
|
||||
* @param fileId The ID of the file to retrieve
|
||||
* @return The file as a MultipartFile
|
||||
* @throws IOException If the file doesn't exist or can't be read
|
||||
*/
|
||||
public MultipartFile retrieveFile(String fileId) throws IOException {
|
||||
Path filePath = getFilePath(fileId);
|
||||
|
||||
if (!Files.exists(filePath)) {
|
||||
throw new IOException("File not found with ID: " + fileId);
|
||||
}
|
||||
|
||||
byte[] fileData = Files.readAllBytes(filePath);
|
||||
return fileOrUploadService.toMockMultipartFile(fileId, fileData);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve a file by its ID as a byte array
|
||||
*
|
||||
* @param fileId The ID of the file to retrieve
|
||||
* @return The file as a byte array
|
||||
* @throws IOException If the file doesn't exist or can't be read
|
||||
*/
|
||||
public byte[] retrieveBytes(String fileId) throws IOException {
|
||||
Path filePath = getFilePath(fileId);
|
||||
|
||||
if (!Files.exists(filePath)) {
|
||||
throw new IOException("File not found with ID: " + fileId);
|
||||
}
|
||||
|
||||
return Files.readAllBytes(filePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a file by its ID
|
||||
*
|
||||
* @param fileId The ID of the file to delete
|
||||
* @return true if the file was deleted, false otherwise
|
||||
*/
|
||||
public boolean deleteFile(String fileId) {
|
||||
try {
|
||||
Path filePath = getFilePath(fileId);
|
||||
return Files.deleteIfExists(filePath);
|
||||
} catch (IOException e) {
|
||||
log.error("Error deleting file with ID: {}", fileId, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a file exists by its ID
|
||||
*
|
||||
* @param fileId The ID of the file to check
|
||||
* @return true if the file exists, false otherwise
|
||||
*/
|
||||
public boolean fileExists(String fileId) {
|
||||
Path filePath = getFilePath(fileId);
|
||||
return Files.exists(filePath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the path for a file ID
|
||||
*
|
||||
* @param fileId The ID of the file
|
||||
* @return The path to the file
|
||||
*/
|
||||
private Path getFilePath(String fileId) {
|
||||
return Path.of(tempDirPath).resolve(fileId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a unique file ID
|
||||
*
|
||||
* @return A unique file ID
|
||||
*/
|
||||
private String generateFileId() {
|
||||
return UUID.randomUUID().toString();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,476 @@
|
||||
package stirling.software.common.service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import stirling.software.common.model.job.JobResponse;
|
||||
import stirling.software.common.util.ExecutorFactory;
|
||||
|
||||
/** Service for executing jobs asynchronously or synchronously */
|
||||
@Service
|
||||
@Slf4j
|
||||
public class JobExecutorService {
|
||||
|
||||
private final TaskManager taskManager;
|
||||
private final FileStorage fileStorage;
|
||||
private final HttpServletRequest request;
|
||||
private final ResourceMonitor resourceMonitor;
|
||||
private final JobQueue jobQueue;
|
||||
private final ExecutorService executor = ExecutorFactory.newVirtualOrCachedThreadExecutor();
|
||||
private final long effectiveTimeoutMs;
|
||||
|
||||
public JobExecutorService(
|
||||
TaskManager taskManager,
|
||||
FileStorage fileStorage,
|
||||
HttpServletRequest request,
|
||||
ResourceMonitor resourceMonitor,
|
||||
JobQueue jobQueue,
|
||||
@Value("${spring.mvc.async.request-timeout:1200000}") long asyncRequestTimeoutMs,
|
||||
@Value("${server.servlet.session.timeout:30m}") String sessionTimeout) {
|
||||
this.taskManager = taskManager;
|
||||
this.fileStorage = fileStorage;
|
||||
this.request = request;
|
||||
this.resourceMonitor = resourceMonitor;
|
||||
this.jobQueue = jobQueue;
|
||||
|
||||
// Parse session timeout and calculate effective timeout once during initialization
|
||||
long sessionTimeoutMs = parseSessionTimeout(sessionTimeout);
|
||||
this.effectiveTimeoutMs = Math.min(asyncRequestTimeoutMs, sessionTimeoutMs);
|
||||
log.debug(
|
||||
"Job executor configured with effective timeout of {} ms", this.effectiveTimeoutMs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a job either asynchronously or synchronously
|
||||
*
|
||||
* @param async Whether to run the job asynchronously
|
||||
* @param work The work to be done
|
||||
* @return The response
|
||||
*/
|
||||
public ResponseEntity<?> runJobGeneric(boolean async, Supplier<Object> work) {
|
||||
return runJobGeneric(async, work, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a job either asynchronously or synchronously with a custom timeout
|
||||
*
|
||||
* @param async Whether to run the job asynchronously
|
||||
* @param work The work to be done
|
||||
* @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default
|
||||
* @return The response
|
||||
*/
|
||||
public ResponseEntity<?> runJobGeneric(
|
||||
boolean async, Supplier<Object> work, long customTimeoutMs) {
|
||||
return runJobGeneric(async, work, customTimeoutMs, false, 50);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a job either asynchronously or synchronously with custom parameters
|
||||
*
|
||||
* @param async Whether to run the job asynchronously
|
||||
* @param work The work to be done
|
||||
* @param customTimeoutMs Custom timeout in milliseconds, or -1 to use the default
|
||||
* @param queueable Whether this job can be queued when system resources are limited
|
||||
* @param resourceWeight The resource weight of this job (1-100)
|
||||
* @return The response
|
||||
*/
|
||||
public ResponseEntity<?> runJobGeneric(
|
||||
boolean async,
|
||||
Supplier<Object> work,
|
||||
long customTimeoutMs,
|
||||
boolean queueable,
|
||||
int resourceWeight) {
|
||||
String jobId = UUID.randomUUID().toString();
|
||||
|
||||
// Store the job ID in the request for potential use by other components
|
||||
if (request != null) {
|
||||
request.setAttribute("jobId", jobId);
|
||||
|
||||
// Also track this job ID in the user's session for authorization purposes
|
||||
// This ensures users can only cancel their own jobs
|
||||
if (request.getSession() != null) {
|
||||
@SuppressWarnings("unchecked")
|
||||
java.util.Set<String> userJobIds =
|
||||
(java.util.Set<String>) request.getSession().getAttribute("userJobIds");
|
||||
|
||||
if (userJobIds == null) {
|
||||
userJobIds = new java.util.concurrent.ConcurrentSkipListSet<>();
|
||||
request.getSession().setAttribute("userJobIds", userJobIds);
|
||||
}
|
||||
|
||||
userJobIds.add(jobId);
|
||||
log.debug("Added job ID {} to user session", jobId);
|
||||
}
|
||||
}
|
||||
|
||||
// Determine which timeout to use
|
||||
long timeoutToUse = customTimeoutMs > 0 ? customTimeoutMs : effectiveTimeoutMs;
|
||||
|
||||
log.debug(
|
||||
"Running job with ID: {}, async: {}, timeout: {}ms, queueable: {}, weight: {}",
|
||||
jobId,
|
||||
async,
|
||||
timeoutToUse,
|
||||
queueable,
|
||||
resourceWeight);
|
||||
|
||||
// Check if we need to queue this job based on resource availability
|
||||
boolean shouldQueue =
|
||||
queueable
|
||||
&& async
|
||||
&& // Only async jobs can be queued
|
||||
resourceMonitor.shouldQueueJob(resourceWeight);
|
||||
|
||||
if (shouldQueue) {
|
||||
// Queue the job instead of executing immediately
|
||||
log.debug(
|
||||
"Queueing job {} due to resource constraints (weight: {})",
|
||||
jobId,
|
||||
resourceWeight);
|
||||
|
||||
taskManager.createTask(jobId);
|
||||
|
||||
// Create a specialized wrapper that updates the TaskManager
|
||||
Supplier<Object> wrappedWork =
|
||||
() -> {
|
||||
try {
|
||||
Object result = work.get();
|
||||
processJobResult(jobId, result);
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
log.error(
|
||||
"Error executing queued job {}: {}", jobId, e.getMessage(), e);
|
||||
taskManager.setError(jobId, e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
};
|
||||
|
||||
// Queue the job and get the future
|
||||
CompletableFuture<ResponseEntity<?>> future =
|
||||
jobQueue.queueJob(jobId, resourceWeight, wrappedWork, timeoutToUse);
|
||||
|
||||
// Return immediately with job ID
|
||||
return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null));
|
||||
} else if (async) {
|
||||
taskManager.createTask(jobId);
|
||||
executor.execute(
|
||||
() -> {
|
||||
try {
|
||||
log.debug(
|
||||
"Running async job {} with timeout {} ms", jobId, timeoutToUse);
|
||||
|
||||
// Execute with timeout
|
||||
Object result = executeWithTimeout(() -> work.get(), timeoutToUse);
|
||||
processJobResult(jobId, result);
|
||||
} catch (TimeoutException te) {
|
||||
log.error("Job {} timed out after {} ms", jobId, timeoutToUse);
|
||||
taskManager.setError(jobId, "Job timed out");
|
||||
} catch (Exception e) {
|
||||
log.error("Error executing job {}: {}", jobId, e.getMessage(), e);
|
||||
taskManager.setError(jobId, e.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
return ResponseEntity.ok().body(new JobResponse<>(true, jobId, null));
|
||||
} else {
|
||||
try {
|
||||
log.debug("Running sync job with timeout {} ms", timeoutToUse);
|
||||
|
||||
// Execute with timeout
|
||||
Object result = executeWithTimeout(() -> work.get(), timeoutToUse);
|
||||
|
||||
// If the result is already a ResponseEntity, return it directly
|
||||
if (result instanceof ResponseEntity) {
|
||||
return (ResponseEntity<?>) result;
|
||||
}
|
||||
|
||||
// Process different result types
|
||||
return handleResultForSyncJob(result);
|
||||
} catch (TimeoutException te) {
|
||||
log.error("Synchronous job timed out after {} ms", timeoutToUse);
|
||||
return ResponseEntity.internalServerError()
|
||||
.body(Map.of("error", "Job timed out after " + timeoutToUse + " ms"));
|
||||
} catch (Exception e) {
|
||||
log.error("Error executing synchronous job: {}", e.getMessage(), e);
|
||||
// Construct a JSON error response
|
||||
return ResponseEntity.internalServerError()
|
||||
.body(Map.of("error", "Job failed: " + e.getMessage()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the result of an asynchronous job
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @param result The result
|
||||
*/
|
||||
private void processJobResult(String jobId, Object result) {
|
||||
try {
|
||||
if (result instanceof byte[]) {
|
||||
// Store byte array directly to disk to avoid double memory consumption
|
||||
String fileId = fileStorage.storeBytes((byte[]) result, "result.pdf");
|
||||
taskManager.setFileResult(jobId, fileId, "result.pdf", "application/pdf");
|
||||
log.debug("Stored byte[] result with fileId: {}", fileId);
|
||||
|
||||
// Let the byte array get collected naturally in the next GC cycle
|
||||
// We don't need to force System.gc() which can be harmful
|
||||
} else if (result instanceof ResponseEntity) {
|
||||
ResponseEntity<?> response = (ResponseEntity<?>) result;
|
||||
Object body = response.getBody();
|
||||
|
||||
if (body instanceof byte[]) {
|
||||
// Extract filename from content-disposition header if available
|
||||
String filename = "result.pdf";
|
||||
String contentType = "application/pdf";
|
||||
|
||||
if (response.getHeaders().getContentDisposition() != null) {
|
||||
String disposition =
|
||||
response.getHeaders().getContentDisposition().toString();
|
||||
if (disposition.contains("filename=")) {
|
||||
filename =
|
||||
disposition.substring(
|
||||
disposition.indexOf("filename=") + 9,
|
||||
disposition.lastIndexOf("\""));
|
||||
}
|
||||
}
|
||||
|
||||
if (response.getHeaders().getContentType() != null) {
|
||||
contentType = response.getHeaders().getContentType().toString();
|
||||
}
|
||||
|
||||
// Store byte array directly to disk
|
||||
String fileId = fileStorage.storeBytes((byte[]) body, filename);
|
||||
taskManager.setFileResult(jobId, fileId, filename, contentType);
|
||||
log.debug("Stored ResponseEntity<byte[]> result with fileId: {}", fileId);
|
||||
|
||||
// Let the GC handle the memory naturally
|
||||
} else {
|
||||
// Check if the response body contains a fileId
|
||||
if (body != null && body.toString().contains("fileId")) {
|
||||
try {
|
||||
// Try to extract fileId using reflection
|
||||
java.lang.reflect.Method getFileId =
|
||||
body.getClass().getMethod("getFileId");
|
||||
String fileId = (String) getFileId.invoke(body);
|
||||
|
||||
if (fileId != null && !fileId.isEmpty()) {
|
||||
// Try to get filename and content type
|
||||
String filename = "result.pdf";
|
||||
String contentType = "application/pdf";
|
||||
|
||||
try {
|
||||
java.lang.reflect.Method getOriginalFileName =
|
||||
body.getClass().getMethod("getOriginalFilename");
|
||||
String origName = (String) getOriginalFileName.invoke(body);
|
||||
if (origName != null && !origName.isEmpty()) {
|
||||
filename = origName;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug(
|
||||
"Could not get original filename: {}", e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
java.lang.reflect.Method getContentType =
|
||||
body.getClass().getMethod("getContentType");
|
||||
String ct = (String) getContentType.invoke(body);
|
||||
if (ct != null && !ct.isEmpty()) {
|
||||
contentType = ct;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug("Could not get content type: {}", e.getMessage());
|
||||
}
|
||||
|
||||
taskManager.setFileResult(jobId, fileId, filename, contentType);
|
||||
log.debug("Extracted fileId from response body: {}", fileId);
|
||||
|
||||
taskManager.setComplete(jobId);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug(
|
||||
"Failed to extract fileId from response body: {}",
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// Store generic result
|
||||
taskManager.setResult(jobId, body);
|
||||
}
|
||||
} else if (result instanceof MultipartFile) {
|
||||
MultipartFile file = (MultipartFile) result;
|
||||
String fileId = fileStorage.storeFile(file);
|
||||
taskManager.setFileResult(
|
||||
jobId, fileId, file.getOriginalFilename(), file.getContentType());
|
||||
log.debug("Stored MultipartFile result with fileId: {}", fileId);
|
||||
} else {
|
||||
// Check if result has a fileId field
|
||||
if (result != null) {
|
||||
try {
|
||||
// Try to extract fileId using reflection
|
||||
java.lang.reflect.Method getFileId =
|
||||
result.getClass().getMethod("getFileId");
|
||||
String fileId = (String) getFileId.invoke(result);
|
||||
|
||||
if (fileId != null && !fileId.isEmpty()) {
|
||||
// Try to get filename and content type
|
||||
String filename = "result.pdf";
|
||||
String contentType = "application/pdf";
|
||||
|
||||
try {
|
||||
java.lang.reflect.Method getOriginalFileName =
|
||||
result.getClass().getMethod("getOriginalFilename");
|
||||
String origName = (String) getOriginalFileName.invoke(result);
|
||||
if (origName != null && !origName.isEmpty()) {
|
||||
filename = origName;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug("Could not get original filename: {}", e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
java.lang.reflect.Method getContentType =
|
||||
result.getClass().getMethod("getContentType");
|
||||
String ct = (String) getContentType.invoke(result);
|
||||
if (ct != null && !ct.isEmpty()) {
|
||||
contentType = ct;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug("Could not get content type: {}", e.getMessage());
|
||||
}
|
||||
|
||||
taskManager.setFileResult(jobId, fileId, filename, contentType);
|
||||
log.debug("Extracted fileId from result object: {}", fileId);
|
||||
|
||||
taskManager.setComplete(jobId);
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.debug(
|
||||
"Failed to extract fileId from result object: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// Default case: store the result as is
|
||||
taskManager.setResult(jobId, result);
|
||||
}
|
||||
|
||||
taskManager.setComplete(jobId);
|
||||
} catch (Exception e) {
|
||||
log.error("Error processing job result: {}", e.getMessage(), e);
|
||||
taskManager.setError(jobId, "Error processing result: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle different result types for synchronous jobs
|
||||
*
|
||||
* @param result The result object
|
||||
* @return The appropriate ResponseEntity
|
||||
* @throws IOException If there is an error processing the result
|
||||
*/
|
||||
private ResponseEntity<?> handleResultForSyncJob(Object result) throws IOException {
|
||||
if (result instanceof byte[]) {
|
||||
// Return byte array as PDF
|
||||
return ResponseEntity.ok()
|
||||
.contentType(MediaType.APPLICATION_PDF)
|
||||
.header(
|
||||
HttpHeaders.CONTENT_DISPOSITION,
|
||||
"form-data; name=\"attachment\"; filename=\"result.pdf\"")
|
||||
.body(result);
|
||||
} else if (result instanceof MultipartFile) {
|
||||
// Return MultipartFile content
|
||||
MultipartFile file = (MultipartFile) result;
|
||||
return ResponseEntity.ok()
|
||||
.contentType(MediaType.parseMediaType(file.getContentType()))
|
||||
.header(
|
||||
HttpHeaders.CONTENT_DISPOSITION,
|
||||
"form-data; name=\"attachment\"; filename=\""
|
||||
+ file.getOriginalFilename()
|
||||
+ "\"")
|
||||
.body(file.getBytes());
|
||||
} else {
|
||||
// Default case: return as JSON
|
||||
return ResponseEntity.ok(result);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse session timeout string (e.g., "30m", "1h") to milliseconds
|
||||
*
|
||||
* @param timeout The timeout string
|
||||
* @return The timeout in milliseconds
|
||||
*/
|
||||
private long parseSessionTimeout(String timeout) {
|
||||
if (timeout == null || timeout.isEmpty()) {
|
||||
return 30 * 60 * 1000; // Default: 30 minutes
|
||||
}
|
||||
|
||||
try {
|
||||
String value = timeout.replaceAll("[^\\d.]", "");
|
||||
String unit = timeout.replaceAll("[\\d.]", "");
|
||||
|
||||
double numericValue = Double.parseDouble(value);
|
||||
|
||||
return switch (unit.toLowerCase()) {
|
||||
case "s" -> (long) (numericValue * 1000);
|
||||
case "m" -> (long) (numericValue * 60 * 1000);
|
||||
case "h" -> (long) (numericValue * 60 * 60 * 1000);
|
||||
case "d" -> (long) (numericValue * 24 * 60 * 60 * 1000);
|
||||
default -> (long) (numericValue * 60 * 1000); // Default to minutes
|
||||
};
|
||||
} catch (Exception e) {
|
||||
log.warn("Could not parse session timeout '{}', using default", timeout);
|
||||
return 30 * 60 * 1000; // Default: 30 minutes
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a supplier with a timeout
|
||||
*
|
||||
* @param supplier The supplier to execute
|
||||
* @param timeoutMs The timeout in milliseconds
|
||||
* @return The result from the supplier
|
||||
* @throws TimeoutException If the execution times out
|
||||
* @throws Exception If the supplier throws an exception
|
||||
*/
|
||||
private <T> T executeWithTimeout(Supplier<T> supplier, long timeoutMs)
|
||||
throws TimeoutException, Exception {
|
||||
// Use the same executor as other async jobs for consistency
|
||||
// This ensures all operations run on the same thread pool
|
||||
java.util.concurrent.CompletableFuture<T> future =
|
||||
java.util.concurrent.CompletableFuture.supplyAsync(supplier, executor);
|
||||
|
||||
try {
|
||||
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
} catch (java.util.concurrent.TimeoutException e) {
|
||||
future.cancel(true);
|
||||
throw new TimeoutException("Execution timed out after " + timeoutMs + " ms");
|
||||
} catch (java.util.concurrent.ExecutionException e) {
|
||||
throw (Exception) e.getCause();
|
||||
} catch (java.util.concurrent.CancellationException e) {
|
||||
throw new Exception("Execution was cancelled", e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new Exception("Execution was interrupted", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,495 @@
|
||||
package stirling.software.common.service;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.SmartLifecycle;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import stirling.software.common.util.ExecutorFactory;
|
||||
import stirling.software.common.util.SpringContextHolder;
|
||||
|
||||
/**
|
||||
* Manages a queue of jobs with dynamic sizing based on system resources. Used when system resources
|
||||
* are limited to prevent overloading.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class JobQueue implements SmartLifecycle {
|
||||
|
||||
private volatile boolean running = false;
|
||||
|
||||
private final ResourceMonitor resourceMonitor;
|
||||
|
||||
@Value("${stirling.job.queue.base-capacity:10}")
|
||||
private int baseQueueCapacity = 10;
|
||||
|
||||
@Value("${stirling.job.queue.min-capacity:2}")
|
||||
private int minQueueCapacity = 2;
|
||||
|
||||
@Value("${stirling.job.queue.check-interval-ms:1000}")
|
||||
private long queueCheckIntervalMs = 1000;
|
||||
|
||||
@Value("${stirling.job.queue.max-wait-time-ms:600000}")
|
||||
private long maxWaitTimeMs = 600000; // 10 minutes
|
||||
|
||||
private volatile BlockingQueue<QueuedJob> jobQueue;
|
||||
private final Map<String, QueuedJob> jobMap = new ConcurrentHashMap<>();
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
private final ExecutorService jobExecutor = ExecutorFactory.newVirtualOrCachedThreadExecutor();
|
||||
private final Object queueLock = new Object(); // Lock for synchronizing queue operations
|
||||
|
||||
private boolean shuttingDown = false;
|
||||
|
||||
@Getter private int rejectedJobs = 0;
|
||||
|
||||
@Getter private int totalQueuedJobs = 0;
|
||||
|
||||
@Getter private int currentQueueSize = 0;
|
||||
|
||||
/** Represents a job waiting in the queue. */
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
private static class QueuedJob {
|
||||
private final String jobId;
|
||||
private final int resourceWeight;
|
||||
private final Supplier<Object> work;
|
||||
private final long timeoutMs;
|
||||
private final Instant queuedAt;
|
||||
private CompletableFuture<ResponseEntity<?>> future;
|
||||
private volatile boolean cancelled = false;
|
||||
}
|
||||
|
||||
public JobQueue(ResourceMonitor resourceMonitor) {
|
||||
this.resourceMonitor = resourceMonitor;
|
||||
|
||||
// Initialize with dynamic capacity
|
||||
int capacity =
|
||||
resourceMonitor.calculateDynamicQueueCapacity(baseQueueCapacity, minQueueCapacity);
|
||||
this.jobQueue = new LinkedBlockingQueue<>(capacity);
|
||||
}
|
||||
|
||||
// Remove @PostConstruct to let SmartLifecycle control startup
|
||||
private void initializeSchedulers() {
|
||||
log.debug(
|
||||
"Starting job queue with base capacity {}, min capacity {}",
|
||||
baseQueueCapacity,
|
||||
minQueueCapacity);
|
||||
|
||||
// Periodically process the job queue
|
||||
scheduler.scheduleWithFixedDelay(
|
||||
this::processQueue, 0, queueCheckIntervalMs, TimeUnit.MILLISECONDS);
|
||||
|
||||
// Periodically update queue capacity based on resource usage
|
||||
scheduler.scheduleWithFixedDelay(
|
||||
this::updateQueueCapacity,
|
||||
10000, // Initial delay
|
||||
30000, // 30 second interval
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
// Remove @PreDestroy to let SmartLifecycle control shutdown
|
||||
private void shutdownSchedulers() {
|
||||
log.info("Shutting down job queue");
|
||||
shuttingDown = true;
|
||||
|
||||
// Complete any futures that are still waiting
|
||||
jobMap.forEach(
|
||||
(id, job) -> {
|
||||
if (!job.future.isDone()) {
|
||||
job.future.completeExceptionally(
|
||||
new RuntimeException("Server shutting down, job cancelled"));
|
||||
}
|
||||
});
|
||||
|
||||
// Shutdown schedulers and wait for termination
|
||||
try {
|
||||
scheduler.shutdown();
|
||||
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
|
||||
jobExecutor.shutdown();
|
||||
if (!jobExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
jobExecutor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
scheduler.shutdownNow();
|
||||
jobExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
log.info(
|
||||
"Job queue shutdown complete. Stats: total={}, rejected={}",
|
||||
totalQueuedJobs,
|
||||
rejectedJobs);
|
||||
}
|
||||
|
||||
// SmartLifecycle methods
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
log.info("Starting JobQueue lifecycle");
|
||||
if (!running) {
|
||||
initializeSchedulers();
|
||||
running = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
log.info("Stopping JobQueue lifecycle");
|
||||
shutdownSchedulers();
|
||||
running = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPhase() {
|
||||
// Start earlier than most components, but shutdown later
|
||||
return 10;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoStartup() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues a job for execution when resources permit.
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @param resourceWeight The resource weight of the job (1-100)
|
||||
* @param work The work to be done
|
||||
* @param timeoutMs The timeout in milliseconds
|
||||
* @return A CompletableFuture that will complete when the job is executed
|
||||
*/
|
||||
public CompletableFuture<ResponseEntity<?>> queueJob(
|
||||
String jobId, int resourceWeight, Supplier<Object> work, long timeoutMs) {
|
||||
|
||||
// Create a CompletableFuture to track this job's completion
|
||||
CompletableFuture<ResponseEntity<?>> future = new CompletableFuture<>();
|
||||
|
||||
// Create the queued job
|
||||
QueuedJob job =
|
||||
new QueuedJob(jobId, resourceWeight, work, timeoutMs, Instant.now(), future, false);
|
||||
|
||||
// Store in our map for lookup
|
||||
jobMap.put(jobId, job);
|
||||
|
||||
// Update stats
|
||||
totalQueuedJobs++;
|
||||
|
||||
// Synchronize access to the queue
|
||||
synchronized (queueLock) {
|
||||
currentQueueSize = jobQueue.size();
|
||||
|
||||
// Try to add to the queue
|
||||
try {
|
||||
boolean added = jobQueue.offer(job, 5, TimeUnit.SECONDS);
|
||||
if (!added) {
|
||||
log.warn("Queue full, rejecting job {}", jobId);
|
||||
rejectedJobs++;
|
||||
future.completeExceptionally(
|
||||
new RuntimeException("Job queue full, please try again later"));
|
||||
jobMap.remove(jobId);
|
||||
return future;
|
||||
}
|
||||
|
||||
log.debug(
|
||||
"Job {} queued for execution (weight: {}, queue size: {})",
|
||||
jobId,
|
||||
resourceWeight,
|
||||
jobQueue.size());
|
||||
|
||||
return future;
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
future.completeExceptionally(new RuntimeException("Job queue interrupted"));
|
||||
jobMap.remove(jobId);
|
||||
return future;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current capacity of the job queue.
|
||||
*
|
||||
* @return The current capacity
|
||||
*/
|
||||
public int getQueueCapacity() {
|
||||
synchronized (queueLock) {
|
||||
return ((LinkedBlockingQueue<QueuedJob>) jobQueue).remainingCapacity()
|
||||
+ jobQueue.size();
|
||||
}
|
||||
}
|
||||
|
||||
/** Updates the capacity of the job queue based on available system resources. */
|
||||
private void updateQueueCapacity() {
|
||||
try {
|
||||
// Calculate new capacity once and cache the result
|
||||
int newCapacity =
|
||||
resourceMonitor.calculateDynamicQueueCapacity(
|
||||
baseQueueCapacity, minQueueCapacity);
|
||||
|
||||
int currentCapacity = getQueueCapacity();
|
||||
if (newCapacity != currentCapacity) {
|
||||
log.debug(
|
||||
"Updating job queue capacity from {} to {}", currentCapacity, newCapacity);
|
||||
|
||||
synchronized (queueLock) {
|
||||
// Double-check that capacity still needs to be updated
|
||||
// Use the cached currentCapacity to avoid calling getQueueCapacity() again
|
||||
if (newCapacity != currentCapacity) {
|
||||
// Create new queue with updated capacity
|
||||
BlockingQueue<QueuedJob> newQueue = new LinkedBlockingQueue<>(newCapacity);
|
||||
|
||||
// Transfer jobs from old queue to new queue
|
||||
jobQueue.drainTo(newQueue);
|
||||
jobQueue = newQueue;
|
||||
|
||||
currentQueueSize = jobQueue.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error updating queue capacity: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Processes jobs in the queue, executing them when resources permit. */
|
||||
private void processQueue() {
|
||||
// Jobs to execute after releasing the lock
|
||||
java.util.List<QueuedJob> jobsToExecute = new java.util.ArrayList<>();
|
||||
|
||||
// First synchronized block: poll jobs from the queue and prepare them for execution
|
||||
synchronized (queueLock) {
|
||||
if (shuttingDown || jobQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Get current resource status
|
||||
ResourceMonitor.ResourceStatus status = resourceMonitor.getCurrentStatus().get();
|
||||
|
||||
// Check if we should execute any jobs
|
||||
boolean canExecuteJobs = (status != ResourceMonitor.ResourceStatus.CRITICAL);
|
||||
|
||||
if (!canExecuteJobs) {
|
||||
// Under critical load, don't execute any jobs
|
||||
log.debug("System under critical load, delaying job execution");
|
||||
return;
|
||||
}
|
||||
|
||||
// Get jobs from the queue, up to a limit based on resource availability
|
||||
int jobsToProcess =
|
||||
Math.max(
|
||||
1,
|
||||
switch (status) {
|
||||
case OK -> 3;
|
||||
case WARNING -> 1;
|
||||
case CRITICAL -> 0;
|
||||
});
|
||||
|
||||
for (int i = 0; i < jobsToProcess && !jobQueue.isEmpty(); i++) {
|
||||
QueuedJob job = jobQueue.poll();
|
||||
if (job == null) break;
|
||||
|
||||
// Check if it's been waiting too long
|
||||
long waitTimeMs = Instant.now().toEpochMilli() - job.queuedAt.toEpochMilli();
|
||||
if (waitTimeMs > maxWaitTimeMs) {
|
||||
log.warn(
|
||||
"Job {} exceeded maximum wait time ({} ms), executing anyway",
|
||||
job.jobId,
|
||||
waitTimeMs);
|
||||
|
||||
// Add a specific status to the job context that can be tracked
|
||||
// This will be visible in the job status API
|
||||
try {
|
||||
TaskManager taskManager =
|
||||
SpringContextHolder.getBean(TaskManager.class);
|
||||
if (taskManager != null) {
|
||||
taskManager.addNote(
|
||||
job.jobId,
|
||||
"QUEUED_TIMEOUT: Job waited in queue for "
|
||||
+ (waitTimeMs / 1000)
|
||||
+ " seconds, exceeding the maximum wait time of "
|
||||
+ (maxWaitTimeMs / 1000)
|
||||
+ " seconds.");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error(
|
||||
"Failed to add timeout note to job {}: {}",
|
||||
job.jobId,
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from our map
|
||||
jobMap.remove(job.jobId);
|
||||
currentQueueSize = jobQueue.size();
|
||||
|
||||
// Add to the list of jobs to execute outside the synchronized block
|
||||
jobsToExecute.add(job);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error processing job queue: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// Now execute the jobs outside the synchronized block to avoid holding the lock
|
||||
for (QueuedJob job : jobsToExecute) {
|
||||
executeJob(job);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a job from the queue.
|
||||
*
|
||||
* @param job The job to execute
|
||||
*/
|
||||
private void executeJob(QueuedJob job) {
|
||||
if (job.cancelled) {
|
||||
log.debug("Job {} was cancelled, not executing", job.jobId);
|
||||
return;
|
||||
}
|
||||
|
||||
jobExecutor.execute(
|
||||
() -> {
|
||||
log.debug("Executing queued job {} (queued at {})", job.jobId, job.queuedAt);
|
||||
|
||||
try {
|
||||
// Execute with timeout
|
||||
Object result = executeWithTimeout(job.work, job.timeoutMs);
|
||||
|
||||
// Process the result
|
||||
if (result instanceof ResponseEntity) {
|
||||
job.future.complete((ResponseEntity<?>) result);
|
||||
} else {
|
||||
job.future.complete(ResponseEntity.ok(result));
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error(
|
||||
"Error executing queued job {}: {}", job.jobId, e.getMessage(), e);
|
||||
job.future.completeExceptionally(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a supplier with a timeout.
|
||||
*
|
||||
* @param supplier The supplier to execute
|
||||
* @param timeoutMs The timeout in milliseconds
|
||||
* @return The result from the supplier
|
||||
* @throws Exception If there is an execution error
|
||||
*/
|
||||
private <T> T executeWithTimeout(Supplier<T> supplier, long timeoutMs) throws Exception {
|
||||
CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);
|
||||
|
||||
try {
|
||||
if (timeoutMs <= 0) {
|
||||
// No timeout
|
||||
return future.join();
|
||||
} else {
|
||||
// With timeout
|
||||
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
future.cancel(true);
|
||||
throw new TimeoutException("Job timed out after " + timeoutMs + "ms");
|
||||
} catch (ExecutionException e) {
|
||||
throw (Exception) e.getCause();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new InterruptedException("Job was interrupted");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a job is queued.
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @return true if the job is queued
|
||||
*/
|
||||
public boolean isJobQueued(String jobId) {
|
||||
return jobMap.containsKey(jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current position of a job in the queue.
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @return The position (0-based) or -1 if not found
|
||||
*/
|
||||
public int getJobPosition(String jobId) {
|
||||
if (!jobMap.containsKey(jobId)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Count positions
|
||||
int position = 0;
|
||||
for (QueuedJob job : jobQueue) {
|
||||
if (job.jobId.equals(jobId)) {
|
||||
return position;
|
||||
}
|
||||
position++;
|
||||
}
|
||||
|
||||
// If we didn't find it in the queue but it's in the map,
|
||||
// it might be executing already
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels a queued job.
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @return true if the job was cancelled, false if not found
|
||||
*/
|
||||
public boolean cancelJob(String jobId) {
|
||||
QueuedJob job = jobMap.remove(jobId);
|
||||
if (job != null) {
|
||||
job.cancelled = true;
|
||||
job.future.completeExceptionally(new RuntimeException("Job cancelled by user"));
|
||||
|
||||
// Try to remove from queue if it's still there
|
||||
jobQueue.remove(job);
|
||||
currentQueueSize = jobQueue.size();
|
||||
|
||||
log.debug("Job {} cancelled", jobId);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get queue statistics.
|
||||
*
|
||||
* @return A map containing queue statistics
|
||||
*/
|
||||
public Map<String, Object> getQueueStats() {
|
||||
return Map.of(
|
||||
"queuedJobs", jobQueue.size(),
|
||||
"queueCapacity", getQueueCapacity(),
|
||||
"totalQueuedJobs", totalQueuedJobs,
|
||||
"rejectedJobs", rejectedJobs,
|
||||
"resourceStatus", resourceMonitor.getCurrentStatus().get().name());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,277 @@
|
||||
package stirling.software.common.service;
|
||||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.MemoryMXBean;
|
||||
import java.lang.management.OperatingSystemMXBean;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Monitors system resources (CPU, memory) to inform job scheduling decisions. Provides information
|
||||
* about available resources to prevent overloading the system.
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class ResourceMonitor {
|
||||
|
||||
@Value("${stirling.resource.memory.critical-threshold:0.9}")
|
||||
private double memoryCriticalThreshold = 0.9; // 90% usage is critical
|
||||
|
||||
@Value("${stirling.resource.memory.high-threshold:0.75}")
|
||||
private double memoryHighThreshold = 0.75; // 75% usage is high
|
||||
|
||||
@Value("${stirling.resource.cpu.critical-threshold:0.9}")
|
||||
private double cpuCriticalThreshold = 0.9; // 90% usage is critical
|
||||
|
||||
@Value("${stirling.resource.cpu.high-threshold:0.75}")
|
||||
private double cpuHighThreshold = 0.75; // 75% usage is high
|
||||
|
||||
@Value("${stirling.resource.monitor.interval-ms:60000}")
|
||||
private long monitorIntervalMs = 60000; // 60 seconds
|
||||
|
||||
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
|
||||
private final OperatingSystemMXBean osMXBean = ManagementFactory.getOperatingSystemMXBean();
|
||||
|
||||
@Getter
|
||||
private final AtomicReference<ResourceStatus> currentStatus =
|
||||
new AtomicReference<>(ResourceStatus.OK);
|
||||
|
||||
@Getter
|
||||
private final AtomicReference<ResourceMetrics> latestMetrics =
|
||||
new AtomicReference<>(new ResourceMetrics());
|
||||
|
||||
/** Represents the current status of system resources. */
|
||||
public enum ResourceStatus {
|
||||
/** Resources are available, normal operations can proceed */
|
||||
OK,
|
||||
|
||||
/** Resources are under strain, consider queueing high-resource operations */
|
||||
WARNING,
|
||||
|
||||
/** Resources are critically low, queue all operations */
|
||||
CRITICAL
|
||||
}
|
||||
|
||||
/** Detailed metrics about system resources. */
|
||||
@Getter
|
||||
public static class ResourceMetrics {
|
||||
private final double cpuUsage;
|
||||
private final double memoryUsage;
|
||||
private final long freeMemoryBytes;
|
||||
private final long totalMemoryBytes;
|
||||
private final long maxMemoryBytes;
|
||||
private final Instant timestamp;
|
||||
|
||||
public ResourceMetrics() {
|
||||
this(0, 0, 0, 0, 0, Instant.now());
|
||||
}
|
||||
|
||||
public ResourceMetrics(
|
||||
double cpuUsage,
|
||||
double memoryUsage,
|
||||
long freeMemoryBytes,
|
||||
long totalMemoryBytes,
|
||||
long maxMemoryBytes,
|
||||
Instant timestamp) {
|
||||
this.cpuUsage = cpuUsage;
|
||||
this.memoryUsage = memoryUsage;
|
||||
this.freeMemoryBytes = freeMemoryBytes;
|
||||
this.totalMemoryBytes = totalMemoryBytes;
|
||||
this.maxMemoryBytes = maxMemoryBytes;
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the age of these metrics.
|
||||
*
|
||||
* @return Duration since these metrics were collected
|
||||
*/
|
||||
public Duration getAge() {
|
||||
return Duration.between(timestamp, Instant.now());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if these metrics are stale (older than threshold).
|
||||
*
|
||||
* @param thresholdMs Staleness threshold in milliseconds
|
||||
* @return true if metrics are stale
|
||||
*/
|
||||
public boolean isStale(long thresholdMs) {
|
||||
return getAge().toMillis() > thresholdMs;
|
||||
}
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void initialize() {
|
||||
log.debug("Starting resource monitoring with interval of {}ms", monitorIntervalMs);
|
||||
scheduler.scheduleAtFixedRate(
|
||||
this::updateResourceMetrics, 0, monitorIntervalMs, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
log.info("Shutting down resource monitoring");
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
|
||||
/** Updates the resource metrics by sampling current system state. */
|
||||
private void updateResourceMetrics() {
|
||||
try {
|
||||
// Get CPU usage
|
||||
double cpuUsage = osMXBean.getSystemLoadAverage() / osMXBean.getAvailableProcessors();
|
||||
if (cpuUsage < 0) cpuUsage = getAlternativeCpuLoad(); // Fallback if not available
|
||||
|
||||
// Get memory usage
|
||||
long heapUsed = memoryMXBean.getHeapMemoryUsage().getUsed();
|
||||
long nonHeapUsed = memoryMXBean.getNonHeapMemoryUsage().getUsed();
|
||||
long totalUsed = heapUsed + nonHeapUsed;
|
||||
|
||||
long maxMemory = Runtime.getRuntime().maxMemory();
|
||||
long totalMemory = Runtime.getRuntime().totalMemory();
|
||||
long freeMemory = Runtime.getRuntime().freeMemory();
|
||||
|
||||
double memoryUsage = (double) totalUsed / maxMemory;
|
||||
|
||||
// Create new metrics
|
||||
ResourceMetrics metrics =
|
||||
new ResourceMetrics(
|
||||
cpuUsage,
|
||||
memoryUsage,
|
||||
freeMemory,
|
||||
totalMemory,
|
||||
maxMemory,
|
||||
Instant.now());
|
||||
latestMetrics.set(metrics);
|
||||
|
||||
// Determine system status
|
||||
ResourceStatus newStatus;
|
||||
if (cpuUsage > cpuCriticalThreshold || memoryUsage > memoryCriticalThreshold) {
|
||||
newStatus = ResourceStatus.CRITICAL;
|
||||
} else if (cpuUsage > cpuHighThreshold || memoryUsage > memoryHighThreshold) {
|
||||
newStatus = ResourceStatus.WARNING;
|
||||
} else {
|
||||
newStatus = ResourceStatus.OK;
|
||||
}
|
||||
|
||||
// Update status if it changed
|
||||
ResourceStatus oldStatus = currentStatus.getAndSet(newStatus);
|
||||
if (oldStatus != newStatus) {
|
||||
log.info("System resource status changed from {} to {}", oldStatus, newStatus);
|
||||
log.info(
|
||||
"Current metrics - CPU: {}%, Memory: {}%, Free Memory: {} MB",
|
||||
String.format("%.1f", cpuUsage * 100), String.format("%.1f", memoryUsage * 100), freeMemory / (1024 * 1024));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error updating resource metrics: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Alternative method to estimate CPU load if getSystemLoadAverage() is not available. This is a
|
||||
* fallback and less accurate than the official JMX method.
|
||||
*
|
||||
* @return Estimated CPU load as a value between 0.0 and 1.0
|
||||
*/
|
||||
private double getAlternativeCpuLoad() {
|
||||
try {
|
||||
// Try to get CPU time if available through reflection
|
||||
// This is a fallback since we can't directly cast to platform-specific classes
|
||||
try {
|
||||
java.lang.reflect.Method m =
|
||||
osMXBean.getClass().getDeclaredMethod("getProcessCpuLoad");
|
||||
m.setAccessible(true);
|
||||
return (double) m.invoke(osMXBean);
|
||||
} catch (Exception e) {
|
||||
// Try the older method
|
||||
try {
|
||||
java.lang.reflect.Method m =
|
||||
osMXBean.getClass().getDeclaredMethod("getSystemCpuLoad");
|
||||
m.setAccessible(true);
|
||||
return (double) m.invoke(osMXBean);
|
||||
} catch (Exception e2) {
|
||||
log.trace(
|
||||
"Could not get CPU load through reflection, assuming moderate load (0.5)");
|
||||
return 0.5;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.trace("Could not get CPU load, assuming moderate load (0.5)");
|
||||
return 0.5; // Default to moderate load
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the dynamic job queue capacity based on current resource usage.
|
||||
*
|
||||
* @param baseCapacity The base capacity when system is under minimal load
|
||||
* @param minCapacity The minimum capacity to maintain even under high load
|
||||
* @return The calculated job queue capacity
|
||||
*/
|
||||
public int calculateDynamicQueueCapacity(int baseCapacity, int minCapacity) {
|
||||
ResourceMetrics metrics = latestMetrics.get();
|
||||
ResourceStatus status = currentStatus.get();
|
||||
|
||||
// Simple linear reduction based on memory and CPU load
|
||||
double capacityFactor =
|
||||
switch (status) {
|
||||
case OK -> 1.0;
|
||||
case WARNING -> 0.6;
|
||||
case CRITICAL -> 0.3;
|
||||
};
|
||||
|
||||
// Apply additional reduction based on specific memory pressure
|
||||
if (metrics.memoryUsage > 0.8) {
|
||||
capacityFactor *= 0.5; // Further reduce capacity under memory pressure
|
||||
}
|
||||
|
||||
// Calculate capacity with minimum safeguard
|
||||
int capacity = (int) Math.max(minCapacity, Math.ceil(baseCapacity * capacityFactor));
|
||||
|
||||
log.debug(
|
||||
"Dynamic queue capacity: {} (base: {}, factor: {:.2f}, status: {})",
|
||||
capacity,
|
||||
baseCapacity,
|
||||
capacityFactor,
|
||||
status);
|
||||
|
||||
return capacity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a job with the given weight can be executed immediately or should be queued based
|
||||
* on current resource availability.
|
||||
*
|
||||
* @param resourceWeight The resource weight of the job (1-100)
|
||||
* @return true if the job should be queued, false if it can run immediately
|
||||
*/
|
||||
public boolean shouldQueueJob(int resourceWeight) {
|
||||
ResourceStatus status = currentStatus.get();
|
||||
|
||||
// Always run lightweight jobs (weight < 20) unless critical
|
||||
if (resourceWeight < 20 && status != ResourceStatus.CRITICAL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Medium weight jobs run immediately if resources are OK
|
||||
if (resourceWeight < 60 && status == ResourceStatus.OK) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Heavy jobs (weight >= 60) and any job during WARNING/CRITICAL should be queued
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,293 @@
|
||||
package stirling.software.common.service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import jakarta.annotation.PreDestroy;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import stirling.software.common.model.job.JobResult;
|
||||
import stirling.software.common.model.job.JobStats;
|
||||
|
||||
/** Manages async tasks and their results */
|
||||
@Service
|
||||
@Slf4j
|
||||
public class TaskManager {
|
||||
private final Map<String, JobResult> jobResults = new ConcurrentHashMap<>();
|
||||
|
||||
@Value("${stirling.jobResultExpiryMinutes:30}")
|
||||
private int jobResultExpiryMinutes = 30;
|
||||
|
||||
private final FileStorage fileStorage;
|
||||
private final ScheduledExecutorService cleanupExecutor =
|
||||
Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
/** Initialize the task manager and start the cleanup scheduler */
|
||||
public TaskManager(FileStorage fileStorage) {
|
||||
this.fileStorage = fileStorage;
|
||||
|
||||
// Schedule periodic cleanup of old job results
|
||||
cleanupExecutor.scheduleAtFixedRate(
|
||||
this::cleanupOldJobs,
|
||||
10, // Initial delay
|
||||
10, // Interval
|
||||
TimeUnit.MINUTES);
|
||||
|
||||
log.debug(
|
||||
"Task manager initialized with job result expiry of {} minutes",
|
||||
jobResultExpiryMinutes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new task with the given job ID
|
||||
*
|
||||
* @param jobId The job ID
|
||||
*/
|
||||
public void createTask(String jobId) {
|
||||
jobResults.put(jobId, JobResult.createNew(jobId));
|
||||
log.debug("Created task with job ID: {}", jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the result of a task as a general object
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @param result The result object
|
||||
*/
|
||||
public void setResult(String jobId, Object result) {
|
||||
JobResult jobResult = getOrCreateJobResult(jobId);
|
||||
jobResult.completeWithResult(result);
|
||||
log.debug("Set result for job ID: {}", jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the result of a task as a file
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @param fileId The file ID
|
||||
* @param originalFileName The original file name
|
||||
* @param contentType The content type of the file
|
||||
*/
|
||||
public void setFileResult(
|
||||
String jobId, String fileId, String originalFileName, String contentType) {
|
||||
JobResult jobResult = getOrCreateJobResult(jobId);
|
||||
jobResult.completeWithFile(fileId, originalFileName, contentType);
|
||||
log.debug("Set file result for job ID: {} with file ID: {}", jobId, fileId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set an error for a task
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @param error The error message
|
||||
*/
|
||||
public void setError(String jobId, String error) {
|
||||
JobResult jobResult = getOrCreateJobResult(jobId);
|
||||
jobResult.failWithError(error);
|
||||
log.debug("Set error for job ID: {}: {}", jobId, error);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a task as complete
|
||||
*
|
||||
* @param jobId The job ID
|
||||
*/
|
||||
public void setComplete(String jobId) {
|
||||
JobResult jobResult = getOrCreateJobResult(jobId);
|
||||
if (jobResult.getResult() == null
|
||||
&& jobResult.getFileId() == null
|
||||
&& jobResult.getError() == null) {
|
||||
// If no result or error has been set, mark it as complete with an empty result
|
||||
jobResult.completeWithResult("Task completed successfully");
|
||||
}
|
||||
log.debug("Marked job ID: {} as complete", jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a task is complete
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @return true if the task is complete, false otherwise
|
||||
*/
|
||||
public boolean isComplete(String jobId) {
|
||||
JobResult result = jobResults.get(jobId);
|
||||
return result != null && result.isComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the result of a task
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @return The result object, or null if the task doesn't exist or is not complete
|
||||
*/
|
||||
public JobResult getJobResult(String jobId) {
|
||||
return jobResults.get(jobId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a note to a task. Notes are informational messages that can be attached to a job for
|
||||
* tracking purposes.
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @param note The note to add
|
||||
* @return true if the note was added successfully, false if the job doesn't exist
|
||||
*/
|
||||
public boolean addNote(String jobId, String note) {
|
||||
JobResult jobResult = jobResults.get(jobId);
|
||||
if (jobResult != null) {
|
||||
jobResult.addNote(note);
|
||||
log.debug("Added note to job ID: {}: {}", jobId, note);
|
||||
return true;
|
||||
}
|
||||
log.warn("Attempted to add note to non-existent job ID: {}", jobId);
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get statistics about all jobs in the system
|
||||
*
|
||||
* @return Job statistics
|
||||
*/
|
||||
public JobStats getJobStats() {
|
||||
int totalJobs = jobResults.size();
|
||||
int activeJobs = 0;
|
||||
int completedJobs = 0;
|
||||
int failedJobs = 0;
|
||||
int successfulJobs = 0;
|
||||
int fileResultJobs = 0;
|
||||
|
||||
LocalDateTime oldestActiveJobTime = null;
|
||||
LocalDateTime newestActiveJobTime = null;
|
||||
long totalProcessingTimeMs = 0;
|
||||
|
||||
for (JobResult result : jobResults.values()) {
|
||||
if (result.isComplete()) {
|
||||
completedJobs++;
|
||||
|
||||
// Calculate processing time for completed jobs
|
||||
if (result.getCreatedAt() != null && result.getCompletedAt() != null) {
|
||||
long processingTimeMs =
|
||||
java.time.Duration.between(
|
||||
result.getCreatedAt(), result.getCompletedAt())
|
||||
.toMillis();
|
||||
totalProcessingTimeMs += processingTimeMs;
|
||||
}
|
||||
|
||||
if (result.getError() != null) {
|
||||
failedJobs++;
|
||||
} else {
|
||||
successfulJobs++;
|
||||
if (result.getFileId() != null) {
|
||||
fileResultJobs++;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
activeJobs++;
|
||||
|
||||
// Track oldest and newest active jobs
|
||||
if (result.getCreatedAt() != null) {
|
||||
if (oldestActiveJobTime == null
|
||||
|| result.getCreatedAt().isBefore(oldestActiveJobTime)) {
|
||||
oldestActiveJobTime = result.getCreatedAt();
|
||||
}
|
||||
|
||||
if (newestActiveJobTime == null
|
||||
|| result.getCreatedAt().isAfter(newestActiveJobTime)) {
|
||||
newestActiveJobTime = result.getCreatedAt();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate average processing time
|
||||
long averageProcessingTimeMs =
|
||||
completedJobs > 0 ? totalProcessingTimeMs / completedJobs : 0;
|
||||
|
||||
return JobStats.builder()
|
||||
.totalJobs(totalJobs)
|
||||
.activeJobs(activeJobs)
|
||||
.completedJobs(completedJobs)
|
||||
.failedJobs(failedJobs)
|
||||
.successfulJobs(successfulJobs)
|
||||
.fileResultJobs(fileResultJobs)
|
||||
.oldestActiveJobTime(oldestActiveJobTime)
|
||||
.newestActiveJobTime(newestActiveJobTime)
|
||||
.averageProcessingTimeMs(averageProcessingTimeMs)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get or create a job result
|
||||
*
|
||||
* @param jobId The job ID
|
||||
* @return The job result
|
||||
*/
|
||||
private JobResult getOrCreateJobResult(String jobId) {
|
||||
return jobResults.computeIfAbsent(jobId, JobResult::createNew);
|
||||
}
|
||||
|
||||
/** Clean up old completed job results */
|
||||
public void cleanupOldJobs() {
|
||||
LocalDateTime expiryThreshold =
|
||||
LocalDateTime.now().minus(jobResultExpiryMinutes, ChronoUnit.MINUTES);
|
||||
int removedCount = 0;
|
||||
|
||||
try {
|
||||
for (Map.Entry<String, JobResult> entry : jobResults.entrySet()) {
|
||||
JobResult result = entry.getValue();
|
||||
|
||||
// Remove completed jobs that are older than the expiry threshold
|
||||
if (result.isComplete()
|
||||
&& result.getCompletedAt() != null
|
||||
&& result.getCompletedAt().isBefore(expiryThreshold)) {
|
||||
|
||||
// If the job has a file result, delete the file
|
||||
if (result.getFileId() != null) {
|
||||
try {
|
||||
fileStorage.deleteFile(result.getFileId());
|
||||
} catch (Exception e) {
|
||||
log.warn(
|
||||
"Failed to delete file for job {}: {}",
|
||||
entry.getKey(),
|
||||
e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the job result
|
||||
jobResults.remove(entry.getKey());
|
||||
removedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
if (removedCount > 0) {
|
||||
log.info("Cleaned up {} expired job results", removedCount);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Error during job cleanup: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Shutdown the cleanup executor */
|
||||
@PreDestroy
|
||||
public void shutdown() {
|
||||
try {
|
||||
log.info("Shutting down job result cleanup executor");
|
||||
cleanupExecutor.shutdown();
|
||||
if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
cleanupExecutor.shutdownNow();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
cleanupExecutor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package stirling.software.common.util;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class ExecutorFactory {
|
||||
|
||||
/**
|
||||
* Creates an ExecutorService using virtual threads if available (Java 21+), or falls back to a
|
||||
* cached thread pool on older Java versions.
|
||||
*/
|
||||
public static ExecutorService newVirtualOrCachedThreadExecutor() {
|
||||
try {
|
||||
ExecutorService executor =
|
||||
(ExecutorService)
|
||||
Executors.class
|
||||
.getMethod("newVirtualThreadPerTaskExecutor")
|
||||
.invoke(null);
|
||||
return executor;
|
||||
} catch (NoSuchMethodException e) {
|
||||
log.debug("Virtual threads not available; falling back to cached thread pool.");
|
||||
} catch (Exception e) {
|
||||
log.debug("Error initializing virtual thread executor: {}", e.getMessage(), e);
|
||||
}
|
||||
|
||||
return Executors.newCachedThreadPool();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,82 @@
|
||||
package stirling.software.common.util;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Utility class to access Spring managed beans from non-Spring managed classes. This is especially
|
||||
* useful for classes that are instantiated by frameworks or created dynamically.
|
||||
*/
|
||||
@Component
|
||||
@Slf4j
|
||||
public class SpringContextHolder implements ApplicationContextAware {
|
||||
|
||||
private static ApplicationContext applicationContext;
|
||||
|
||||
@Override
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
SpringContextHolder.applicationContext = applicationContext;
|
||||
log.debug("Spring context holder initialized");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a Spring bean by class type
|
||||
*
|
||||
* @param <T> The bean type
|
||||
* @param beanClass The bean class
|
||||
* @return The bean instance, or null if not found
|
||||
*/
|
||||
public static <T> T getBean(Class<T> beanClass) {
|
||||
if (applicationContext == null) {
|
||||
log.warn(
|
||||
"Application context not initialized when attempting to get bean of type {}",
|
||||
beanClass.getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return applicationContext.getBean(beanClass);
|
||||
} catch (BeansException e) {
|
||||
log.error("Error getting bean of type {}: {}", beanClass.getName(), e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a Spring bean by name
|
||||
*
|
||||
* @param <T> The bean type
|
||||
* @param beanName The bean name
|
||||
* @return The bean instance, or null if not found
|
||||
*/
|
||||
public static <T> T getBean(String beanName) {
|
||||
if (applicationContext == null) {
|
||||
log.warn(
|
||||
"Application context not initialized when attempting to get bean '{}'",
|
||||
beanName);
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
T bean = (T) applicationContext.getBean(beanName);
|
||||
return bean;
|
||||
} catch (BeansException e) {
|
||||
log.error("Error getting bean '{}': {}", beanName, e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the application context is initialized
|
||||
*
|
||||
* @return true if initialized, false otherwise
|
||||
*/
|
||||
public static boolean isInitialized() {
|
||||
return applicationContext != null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user