From 801dcdb463faaf84538bbd2e0b209c3d21737a5b Mon Sep 17 00:00:00 2001
From: Danny Lau <kkdlau@connect.ust.hk>
Date: Sat, 25 May 2024 00:22:01 +0800
Subject: [PATCH] #1214 Only take files that are good for processing

---
 .../software/SPDF/config/AppConfig.java       |  26 +++
 .../pipeline/PipelineDirectoryProcessor.java  |  19 +-
 .../software/SPDF/utils/FileMonitor.java      | 162 ++++++++++++++++++
 3 files changed, 202 insertions(+), 5 deletions(-)
 create mode 100644 src/main/java/stirling/software/SPDF/utils/FileMonitor.java

diff --git a/src/main/java/stirling/software/SPDF/config/AppConfig.java b/src/main/java/stirling/software/SPDF/config/AppConfig.java
index 16618e1e..3723e4f8 100644
--- a/src/main/java/stirling/software/SPDF/config/AppConfig.java
+++ b/src/main/java/stirling/software/SPDF/config/AppConfig.java
@@ -2,8 +2,10 @@ package stirling.software.SPDF.config;
 
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Properties;
+import java.util.function.Predicate;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
@@ -108,4 +110,28 @@ public class AppConfig {
     public boolean missingActivSecurity() {
         return false;
     }
+
+    @Bean(name = "watchedFoldersDir")
+    public String watchedFoldersDir() {
+        return "./pipeline/watchedFolders/";
+    }
+
+    @Bean(name = "finishedFoldersDir")
+    public String finishedFoldersDir() {
+        return "./pipeline/finishedFolders/";
+    }
+
+    @Bean(name = "directoryFilter")
+    public Predicate<Path> processPDFOnlyFilter() {
+        return path -> {
+            if (Files.isDirectory(path)) {
+                return !path.toString()
+                        .contains(
+                                "processing");
+            } else {
+                String fileName = path.getFileName().toString();
+                return fileName.endsWith(".pdf");
+            }
+        };
+    }
 }
diff --git a/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java b/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java
index c61b29e9..ce7e1b94 100644
--- a/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java
+++ b/src/main/java/stirling/software/SPDF/controller/api/pipeline/PipelineDirectoryProcessor.java
@@ -19,6 +19,7 @@ import java.util.stream.Stream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.core.io.ByteArrayResource;
 import org.springframework.core.io.Resource;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -28,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 
 import stirling.software.SPDF.model.PipelineConfig;
 import stirling.software.SPDF.model.PipelineOperation;
+import stirling.software.SPDF.utils.FileMonitor;
 
 @Service
 public class PipelineDirectoryProcessor {
@@ -35,11 +37,18 @@ public class PipelineDirectoryProcessor {
     private static final Logger logger = LoggerFactory.getLogger(PipelineDirectoryProcessor.class);
     @Autowired private ObjectMapper objectMapper;
     @Autowired private ApiDocService apiDocService;
-
-    final String watchedFoldersDir = "./pipeline/watchedFolders/";
-    final String finishedFoldersDir = "./pipeline/finishedFolders/";
-
     @Autowired PipelineProcessor processor;
+    @Autowired FileMonitor fileMonitor;
+
+    final String watchedFoldersDir;
+    final String finishedFoldersDir;
+
+    public PipelineDirectoryProcessor(
+            @Qualifier("watchedFoldersDir") String watchedFoldersDir,
+            @Qualifier("finishedFoldersDir") String finishedFoldersDir) {
+        this.watchedFoldersDir = watchedFoldersDir;
+        this.finishedFoldersDir = finishedFoldersDir;
+    }
 
     @Scheduled(fixedRate = 60000)
     public void scanFolders() {
@@ -130,7 +139,7 @@ public class PipelineDirectoryProcessor {
             throws IOException {
         try (Stream<Path> paths = Files.list(dir)) {
             if ("automated".equals(operation.getParameters().get("fileInput"))) {
-                return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile))
+                return paths.filter(path -> !Files.isDirectory(path) && !path.equals(jsonFile) && fileMonitor.isFileReadyForProcessing(path))
                         .map(Path::toFile)
                         .toArray(File[]::new);
             } else {
diff --git a/src/main/java/stirling/software/SPDF/utils/FileMonitor.java b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java
new file mode 100644
index 00000000..feff9c16
--- /dev/null
+++ b/src/main/java/stirling/software/SPDF/utils/FileMonitor.java
@@ -0,0 +1,162 @@
+package stirling.software.SPDF.utils;
+
+import static java.nio.file.StandardWatchEventKinds.*;
+
+import java.io.IOException;
+import java.nio.file.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+@Component
+public class FileMonitor {
+    private static final Logger logger = LoggerFactory.getLogger(FileMonitor.class);
+    private final Map<Path, WatchKey> path2KeyMapping;
+    private final Set<Path> newlyDiscoveredFiles;
+    private final ConcurrentHashMap.KeySetView<Path, Boolean> readyForProcessingFiles;
+    private final WatchService watchService;
+    private final Predicate<Path> pathFilter;
+    private Set<Path> stagingFiles;
+
+    /**
+     * @param rootDirectory the root directory to monitor
+     * @param pathFilter the filter to apply to the paths, return true if the path should be
+     *     monitored, false otherwise
+     * @throws IOException
+     */
+    @Autowired
+    public FileMonitor(
+            @Qualifier("watchedFoldersDir") String rootDirectory,
+            @Qualifier("directoryFilter") Predicate<Path> pathFilter)
+            throws IOException {
+        this.newlyDiscoveredFiles = new HashSet<>();
+        this.path2KeyMapping = new HashMap<>();
+        this.stagingFiles = new HashSet<>();
+        this.pathFilter = pathFilter;
+        this.readyForProcessingFiles = ConcurrentHashMap.newKeySet();
+        this.watchService = FileSystems.getDefault().newWatchService();
+
+        Path path = Path.of(rootDirectory);
+        recursivelyRegisterEntry(path);
+
+        logger.info("Created a new file tracker for directory: {}", rootDirectory);
+    }
+
+    private boolean shouldNotProcess(Path path) {
+        return !pathFilter.test(path);
+    }
+
+    private void recursivelyRegisterEntry(Path dir) throws IOException {
+        WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
+        path2KeyMapping.put(dir, key);
+        logger.info("Registered directory: {}", dir);
+
+        try (Stream<Path> directoryVisitor = Files.walk(dir, 1)) {
+            final Iterator<Path> iterator = directoryVisitor.iterator();
+            while (iterator.hasNext()) {
+                Path path = iterator.next();
+                if (path.equals(dir) || shouldNotProcess(path)) continue;
+
+                if (Files.isDirectory(path)) {
+                    recursivelyRegisterEntry(path);
+                } else if (Files.isRegularFile(path)) {
+                    handleFileCreation(path);
+                }
+            }
+        }
+    }
+
+    @Scheduled(fixedRate = 5000)
+    public void trackFiles() {
+        /*
+         All files observed changes in the last iteration will be considered as staging files.
+         If those files are not modified in current iteration, they will be considered as ready for processing.
+        */
+        stagingFiles = new HashSet<>(newlyDiscoveredFiles);
+        readyForProcessingFiles.clear();
+        WatchKey key;
+        while ((key = watchService.poll()) != null) {
+            final Path watchingDir = (Path) key.watchable();
+            key.pollEvents()
+                    .forEach(
+                            (evt) -> {
+                                final Path path = (Path) evt.context();
+                                final WatchEvent.Kind<?> kind = evt.kind();
+                                if (shouldNotProcess(path)) return;
+
+                                try {
+                                    if (Files.isDirectory(path)) {
+                                        if (kind == ENTRY_CREATE) {
+                                            handleDirectoryCreation(path);
+                                        }
+                                        /*
+                                         we don't need to handle directory deletion or modification
+                                         - directory deletion will be handled by key.reset()
+                                         - directory modification indicates a new file creation or deletion, which is handled by below
+                                        */
+                                    }
+                                    Path relativePathFromRoot = watchingDir.resolve(path);
+                                    if (kind == ENTRY_CREATE) {
+                                        handleFileCreation(relativePathFromRoot);
+                                    } else if (kind == ENTRY_DELETE) {
+                                        handleFileRemoval(relativePathFromRoot);
+                                    } else if (kind == ENTRY_MODIFY) {
+                                        handleFileModification(relativePathFromRoot);
+                                    }
+                                } catch (Exception e) {
+                                    logger.error("Error while processing file: {}", path, e);
+                                }
+                            });
+
+            boolean isKeyValid = key.reset();
+            if (!isKeyValid) { // key is invalid when the directory itself is no longer exists
+                path2KeyMapping.remove((Path) key.watchable());
+                if (path2KeyMapping.isEmpty()) {
+                    logger.warn(
+                            "FileMonitor is not monitoring any directory, no even the root directory.");
+                }
+            }
+        }
+        readyForProcessingFiles.addAll(stagingFiles);
+    }
+
+    private void handleDirectoryCreation(Path dir) throws IOException {
+        WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
+        path2KeyMapping.put(dir, key);
+    }
+
+    private void handleFileRemoval(Path path) {
+        newlyDiscoveredFiles.remove(path);
+        stagingFiles.remove(path);
+    }
+
+    private void handleFileCreation(Path path) {
+        newlyDiscoveredFiles.add(path);
+        stagingFiles.remove(path);
+    }
+
+    private void handleFileModification(Path path) {
+        // the logic is the same
+        handleFileCreation(path);
+    }
+
+    /**
+     * Check if the file is ready for processing.
+     *
+     * <p>A file is ready for processing if it is not being modified for 5000ms.
+     *
+     * @param path the path of the file
+     * @return true if the file is ready for processing, false otherwise
+     */
+    public boolean isFileReadyForProcessing(Path path) {
+        return readyForProcessingFiles.contains(path);
+    }
+}