feat(pipeline): improve file processing with resource management and temp file handling (#5488)

# Description of Changes

<!--
Please provide a summary of the changes, including:

- What was changed
- Why the change was made
- Any challenges encountered

Closes #(issue_number)
-->

---

## Checklist

### General

- [ ] I have read the [Contribution
Guidelines](https://github.com/Stirling-Tools/Stirling-PDF/blob/main/CONTRIBUTING.md)
- [ ] I have read the [Stirling-PDF Developer
Guide](https://github.com/Stirling-Tools/Stirling-PDF/blob/main/devGuide/DeveloperGuide.md)
(if applicable)
- [ ] I have read the [How to add new languages to
Stirling-PDF](https://github.com/Stirling-Tools/Stirling-PDF/blob/main/devGuide/HowToAddNewLanguage.md)
(if applicable)
- [ ] I have performed a self-review of my own code
- [ ] My changes generate no new warnings

### Documentation

- [ ] I have updated relevant docs on [Stirling-PDF's doc
repo](https://github.com/Stirling-Tools/Stirling-Tools.github.io/blob/main/docs/)
(if functionality has heavily changed)
- [ ] I have read the section [Add New Translation
Tags](https://github.com/Stirling-Tools/Stirling-PDF/blob/main/devGuide/HowToAddNewLanguage.md#add-new-translation-tags)
(for new translation tags only)

### Translations (if applicable)

- [ ] I ran
[`scripts/counter_translation.py`](https://github.com/Stirling-Tools/Stirling-PDF/blob/main/docs/counter_translation.md)

### UI Changes (if applicable)

- [ ] Screenshots or videos demonstrating the UI changes are attached
(e.g., as comments or direct attachments in the PR)

### Testing (if applicable)

- [ ] I have tested my changes locally. Refer to the [Testing
Guide](https://github.com/Stirling-Tools/Stirling-PDF/blob/main/devGuide/DeveloperGuide.md#6-testing)
for more details.

Signed-off-by: Balázs Szücs <bszucs1209@gmail.com>
This commit is contained in:
Balázs Szücs
2026-01-17 20:44:02 +01:00
committed by GitHub
parent 473021a13c
commit 59d816bfa7
4 changed files with 192 additions and 53 deletions

View File

@@ -1,9 +1,6 @@
package stirling.software.SPDF.controller.api.pipeline;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystemException;
import java.nio.file.FileVisitResult;
@@ -24,7 +21,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -164,7 +160,8 @@ public class PipelineDirectoryProcessor {
postHogService.captureEvent("pipeline_directory_event", properties);
List<File> filesToProcess = prepareFilesForProcessing(files, processingDir);
runPipelineAgainstFiles(filesToProcess, config, dir, processingDir);
try (PipelineResult result =
runPipelineAgainstFiles(filesToProcess, config, dir, processingDir)) {}
}
}
@@ -303,14 +300,14 @@ public class PipelineDirectoryProcessor {
}
}
private void runPipelineAgainstFiles(
private PipelineResult runPipelineAgainstFiles(
List<File> filesToProcess, PipelineConfig config, Path dir, Path processingDir)
throws IOException {
try {
List<Resource> inputFiles =
processor.generateInputFiles(filesToProcess.toArray(new File[0]));
if (inputFiles == null || inputFiles.isEmpty()) {
return;
return new PipelineResult();
}
PipelineResult result = processor.runPipelineAgainstFiles(inputFiles, config);
@@ -321,9 +318,11 @@ public class PipelineDirectoryProcessor {
moveAndRenameFiles(result.getOutputFiles(), config, dir);
deleteOriginalFiles(filesToProcess, processingDir);
}
return result;
} catch (Exception e) {
log.error("Error during processing", e);
moveFilesBack(filesToProcess, processingDir);
return new PipelineResult();
}
}
@@ -350,8 +349,9 @@ public class PipelineDirectoryProcessor {
log.info("Created directory: {}", outputPath);
}
Path outputFile = outputPath.resolve(outputFileName);
try (OutputStream os = new FileOutputStream(outputFile.toFile())) {
os.write(((ByteArrayResource) resource).getByteArray());
try (OutputStream os = new FileOutputStream(outputFile.toFile());
InputStream is = resource.getInputStream()) {
is.transferTo(os);
}
log.info("File moved and renamed to {}", outputFile);
}

View File

@@ -15,9 +15,10 @@ import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.http.*;
import org.springframework.http.converter.FormHttpMessageConverter;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@@ -38,6 +39,8 @@ import stirling.software.SPDF.model.PipelineResult;
import stirling.software.SPDF.service.ApiDocService;
import stirling.software.common.model.enumeration.Role;
import stirling.software.common.service.UserServiceInterface;
import stirling.software.common.util.TempFile;
import stirling.software.common.util.TempFileManager;
@Service
@Slf4j
@@ -49,13 +52,17 @@ public class PipelineProcessor {
private final ServletContext servletContext;
private final TempFileManager tempFileManager;
public PipelineProcessor(
ApiDocService apiDocService,
@Autowired(required = false) UserServiceInterface userService,
ServletContext servletContext) {
ServletContext servletContext,
TempFileManager tempFileManager) {
this.apiDocService = apiDocService;
this.userService = userService;
this.servletContext = servletContext;
this.tempFileManager = tempFileManager;
}
public static String removeTrailingNaming(String filename) {
@@ -137,14 +144,18 @@ public class PipelineProcessor {
body.add(entry.getKey(), entry.getValue());
}
}
ResponseEntity<byte[]> response = sendWebRequest(url, body);
ResponseEntity<Resource> response = sendWebRequest(url, body);
// If the operation is filter and the response body is null or empty,
// skip
// this
// file
if (response.getBody() instanceof TempFileResource tempFileResource) {
result.addTempFile(tempFileResource.getTempFile());
}
if (operation.startsWith("/api/v1/filter/filter-")
&& (response.getBody() == null
|| response.getBody().length == 0)) {
|| response.getBody().contentLength() == 0)) {
filtersApplied = true;
log.info("Skipping file due to filtering {}", operation);
continue;
@@ -154,7 +165,7 @@ public class PipelineProcessor {
hasErrors = true;
continue;
}
processOutputFiles(operation, response, newOutputFiles);
processOutputFiles(operation, response, newOutputFiles, result);
}
}
if (!hasInputFileType) {
@@ -215,10 +226,13 @@ public class PipelineProcessor {
body.add(entry.getKey(), entry.getValue());
}
}
ResponseEntity<byte[]> response = sendWebRequest(url, body);
ResponseEntity<Resource> response = sendWebRequest(url, body);
if (response.getBody() instanceof TempFileResource tempFileResource) {
result.addTempFile(tempFileResource.getTempFile());
}
// Handle the response
if (HttpStatus.OK.equals(response.getStatusCode())) {
processOutputFiles(operation, response, newOutputFiles);
processOutputFiles(operation, response, newOutputFiles, result);
} else {
// Log error if the response status is not OK
logPrintStream.println(
@@ -267,7 +281,7 @@ public class PipelineProcessor {
return result;
}
/* package */ ResponseEntity<byte[]> sendWebRequest(
/* package */ ResponseEntity<Resource> sendWebRequest(
String url, MultiValueMap<String, Object> body) {
RestTemplate restTemplate = new RestTemplate();
// Set up headers, including API key
@@ -275,14 +289,37 @@ public class PipelineProcessor {
String apiKey = getApiKeyForUser();
headers.add("X-API-KEY", apiKey);
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
// Create HttpEntity with the body and headers
HttpEntity<MultiValueMap<String, Object>> entity = new HttpEntity<>(body, headers);
// Make the request to the REST endpoint
return restTemplate.exchange(url, HttpMethod.POST, entity, byte[].class);
return restTemplate.execute(
url,
HttpMethod.POST,
request -> {
request.getHeaders().putAll(headers);
new FormHttpMessageConverter()
.write(body, MediaType.MULTIPART_FORM_DATA, request);
},
response -> {
try {
TempFile tempFile = tempFileManager.createManagedTempFile("pipeline");
Files.copy(
response.getBody(),
tempFile.getPath(),
java.nio.file.StandardCopyOption.REPLACE_EXISTING);
TempFileResource resource = new TempFileResource(tempFile);
return ResponseEntity.status(response.getStatusCode())
.headers(response.getHeaders())
.body(resource);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}
private List<Resource> processOutputFiles(
String operation, ResponseEntity<byte[]> response, List<Resource> newOutputFiles)
String operation,
ResponseEntity<Resource> response,
List<Resource> newOutputFiles,
PipelineResult result)
throws IOException {
// Define filename
String newFilename;
@@ -298,10 +335,14 @@ public class PipelineProcessor {
// Check if the response body is a zip file
if (isZip(response.getBody(), newFilename)) {
// Unzip the file and add all the files to the new output files
newOutputFiles.addAll(unzip(response.getBody()));
newOutputFiles.addAll(unzip(response.getBody(), result));
} else {
final Resource tempResource = response.getBody();
if (tempResource instanceof TempFileResource) {
result.addTempFile(((TempFileResource) tempResource).getTempFile());
}
Resource outputResource =
new ByteArrayResource(response.getBody()) {
new FileSystemResource(tempResource.getFile()) {
@Override
public String getFilename() {
@@ -313,7 +354,7 @@ public class PipelineProcessor {
return newOutputFiles;
}
public String extractFilename(ResponseEntity<byte[]> response) {
public String extractFilename(ResponseEntity<Resource> response) {
// Default filename if not found
String filename = "default-filename.ext";
HttpHeaders headers = response.getHeaders();
@@ -348,14 +389,7 @@ public class PipelineProcessor {
// debug statement
log.info("Reading file: {}", path);
if (Files.exists(path)) {
Resource fileResource =
new ByteArrayResource(Files.readAllBytes(path)) {
@Override
public String getFilename() {
return file.getName();
}
};
Resource fileResource = new FileSystemResource(file);
outputFiles.add(fileResource);
} else {
log.info("File not found: {}", path);
@@ -372,8 +406,11 @@ public class PipelineProcessor {
}
List<Resource> outputFiles = new ArrayList<>();
for (MultipartFile file : files) {
Path tempFile = Files.createTempFile("SPDF-upload-", ".tmp");
file.transferTo(tempFile);
Resource fileResource =
new ByteArrayResource(file.getBytes()) {
new FileSystemResource(tempFile.toFile()) {
@Override
public String getFilename() {
@@ -386,8 +423,8 @@ public class PipelineProcessor {
return outputFiles;
}
private boolean isZip(byte[] data, String filename) {
if (data == null || data.length < 4) {
private boolean isZip(Resource data, String filename) throws IOException {
if (data == null || data.contentLength() < 4) {
return false;
}
if (filename != null) {
@@ -398,29 +435,41 @@ public class PipelineProcessor {
}
}
// Check the first four bytes of the data against the standard zip magic number
return data[0] == 0x50 && data[1] == 0x4B && data[2] == 0x03 && data[3] == 0x04;
try (InputStream is = data.getInputStream()) {
byte[] header = new byte[4];
if (is.read(header) < 4) {
return false;
}
return header[0] == 0x50 && header[1] == 0x4B && header[2] == 0x03 && header[3] == 0x04;
}
}
private boolean isZip(byte[] data) {
private boolean isZip(Resource data) throws IOException {
return isZip(data, null);
}
private List<Resource> unzip(byte[] data) throws IOException {
log.info("Unzipping data of length: {}", data.length);
private List<Resource> unzip(Resource data, PipelineResult result) throws IOException {
log.info("Unzipping data of length: {}", data.contentLength());
List<Resource> unzippedFiles = new ArrayList<>();
try (ByteArrayInputStream bais = new ByteArrayInputStream(data);
try (InputStream bais = data.getInputStream();
ZipInputStream zis = ZipSecurity.createHardenedInputStream(bais)) {
ZipEntry entry;
while ((entry = zis.getNextEntry()) != null) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[1024];
int count;
while ((count = zis.read(buffer)) != -1) {
baos.write(buffer, 0, count);
if (entry.isDirectory()) {
continue;
}
TempFile tempFile = tempFileManager.createManagedTempFile("unzip");
result.addTempFile(tempFile);
try (OutputStream os = Files.newOutputStream(tempFile.getPath())) {
byte[] buffer = new byte[4096];
int count;
while ((count = zis.read(buffer)) != -1) {
os.write(buffer, 0, count);
}
}
final String filename = entry.getName();
Resource fileResource =
new ByteArrayResource(baos.toByteArray()) {
new FileSystemResource(tempFile.getFile()) {
@Override
public String getFilename() {
@@ -428,9 +477,9 @@ public class PipelineProcessor {
}
};
// If the unzipped file is a zip file, unzip it
if (isZip(baos.toByteArray(), filename)) {
if (isZip(fileResource, filename)) {
log.info("File {} is a zip file. Unzipping...", filename);
unzippedFiles.addAll(unzip(baos.toByteArray()));
unzippedFiles.addAll(unzip(fileResource, result));
} else {
unzippedFiles.add(fileResource);
}
@@ -439,4 +488,17 @@ public class PipelineProcessor {
log.info("Unzipping completed. {} files were unzipped.", unzippedFiles.size());
return unzippedFiles;
}
private static class TempFileResource extends FileSystemResource {
private final TempFile tempFile;
public TempFileResource(TempFile tempFile) {
super(tempFile.getFile());
this.tempFile = tempFile;
}
public TempFile getTempFile() {
return tempFile;
}
}
}

View File

@@ -1,14 +1,37 @@
package stirling.software.SPDF.model;
import java.util.ArrayList;
import java.util.List;
import org.springframework.core.io.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import stirling.software.common.util.TempFile;
@Data
public class PipelineResult {
@Slf4j
public class PipelineResult implements AutoCloseable {
private List<Resource> outputFiles;
private boolean hasErrors;
private boolean filtersApplied;
private List<TempFile> tempFiles = new ArrayList<>();
public void addTempFile(TempFile tempFile) {
tempFiles.add(tempFile);
}
@Override
public void close() {
for (TempFile file : tempFiles) {
file.close();
log.debug("Deleted temp file: {}", file.getAbsolutePath());
}
tempFiles.clear();
}
public void cleanup() {
close();
}
}

View File

@@ -4,6 +4,8 @@ import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
@@ -13,6 +15,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@@ -24,6 +27,7 @@ import stirling.software.SPDF.model.PipelineOperation;
import stirling.software.SPDF.model.PipelineResult;
import stirling.software.SPDF.service.ApiDocService;
import stirling.software.common.service.UserServiceInterface;
import stirling.software.common.util.TempFileManager;
@ExtendWith(MockitoExtension.class)
class PipelineProcessorTest {
@@ -34,11 +38,16 @@ class PipelineProcessorTest {
@Mock ServletContext servletContext;
@Mock TempFileManager tempFileManager;
PipelineProcessor pipelineProcessor;
@BeforeEach
void setUp() {
pipelineProcessor = spy(new PipelineProcessor(apiDocService, userService, servletContext));
pipelineProcessor =
spy(
new PipelineProcessor(
apiDocService, userService, servletContext, tempFileManager));
}
@Test
@@ -59,12 +68,18 @@ class PipelineProcessorTest {
when(apiDocService.isValidOperation(eq("/api/v1/filter/filter-page-count"), anyMap()))
.thenReturn(true);
doReturn(new ResponseEntity<>(new byte[0], HttpStatus.OK))
// Use a FileSystemResource backed by a temp file to avoid FileNotFoundException
Path emptyTemp = Files.createTempFile("empty", ".tmp");
Resource emptyResource = new FileSystemResource(emptyTemp.toFile());
doReturn(new ResponseEntity<>(emptyResource, HttpStatus.OK))
.when(pipelineProcessor)
.sendWebRequest(anyString(), any());
PipelineResult result = pipelineProcessor.runPipelineAgainstFiles(files, config);
Files.deleteIfExists(emptyTemp);
assertTrue(
result.isFiltersApplied(),
"Filter flag should be true when operation filters file");
@@ -72,6 +87,45 @@ class PipelineProcessorTest {
assertTrue(result.getOutputFiles().isEmpty(), "Filtered file list should be empty");
}
@Test
void testPipelineSuccessWithResource() throws Exception {
PipelineOperation op = new PipelineOperation();
op.setOperation("/api/v1/misc/compress");
op.setParameters(Map.of());
PipelineConfig config = new PipelineConfig();
config.setOperations(List.of(op));
Resource inputFile = new MyFileByteArrayResource();
List<Resource> files = List.of(inputFile);
Path tempPath = Files.createTempFile("test-output", ".pdf");
Files.write(tempPath, "processed_data".getBytes());
Resource outputResource =
new FileSystemResource(tempPath.toFile()) {
@Override
public String getFilename() {
return "processed.pdf";
}
};
when(apiDocService.isMultiInput(anyString())).thenReturn(false);
when(apiDocService.getExtensionTypes(anyBoolean(), anyString())).thenReturn(List.of("pdf"));
when(apiDocService.isValidOperation(anyString(), anyMap())).thenReturn(true);
doReturn(new ResponseEntity<>(outputResource, HttpStatus.OK))
.when(pipelineProcessor)
.sendWebRequest(anyString(), any());
PipelineResult result = pipelineProcessor.runPipelineAgainstFiles(files, config);
verify(pipelineProcessor).sendWebRequest(anyString(), any());
assertFalse(result.isHasErrors());
// Clean up
Files.deleteIfExists(tempPath);
}
private static class MyFileByteArrayResource extends ByteArrayResource {
public MyFileByteArrayResource() {
super("data".getBytes());