diff --git a/app/core/src/main/java/stirling/software/SPDF/service/telegram/TelegramPipelineBot.java b/app/core/src/main/java/stirling/software/SPDF/service/telegram/TelegramPipelineBot.java index f12f3bd31..cb93226bf 100644 --- a/app/core/src/main/java/stirling/software/SPDF/service/telegram/TelegramPipelineBot.java +++ b/app/core/src/main/java/stirling/software/SPDF/service/telegram/TelegramPipelineBot.java @@ -8,13 +8,11 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.attribute.FileTime; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.UUID; import java.util.stream.Stream; @@ -48,6 +46,15 @@ import stirling.software.common.model.ApplicationProperties; @ConditionalOnProperty(prefix = "telegram", name = "enabled", havingValue = "true") public class TelegramPipelineBot extends TelegramLongPollingBot { + private static final String CHAT_PRIVATE = "private"; + private static final String CHAT_GROUP = "group"; + private static final String CHAT_SUPERGROUP = "supergroup"; + private static final String CHAT_CHANNEL = "channel"; + + private static final List ALLOWED_MIME_TYPES = List.of("application/pdf"); + + private final Object pipelinePollMonitor = new Object(); + private final ApplicationProperties.Telegram telegramProperties; private final RuntimePathConfig runtimePathConfig; private final TelegramBotsApi telegramBotsApi; @@ -56,6 +63,7 @@ public class TelegramPipelineBot extends TelegramLongPollingBot { ApplicationProperties applicationProperties, RuntimePathConfig runtimePathConfig, TelegramBotsApi telegramBotsApi) { + super(applicationProperties.getTelegram().getBotToken()); this.telegramProperties = applicationProperties.getTelegram(); this.runtimePathConfig = runtimePathConfig; @@ -78,111 +86,35 @@ public class TelegramPipelineBot extends TelegramLongPollingBot { @Override public void onUpdateReceived(Update update) { - Message message = null; - - // 1) Regular messages - if (update.hasMessage()) { - message = update.getMessage(); - } - // 2) Channel posts - else if (update.hasChannelPost()) { - message = update.getChannelPost(); - } else { + Message message = extractMessage(update); + if (message == null) { return; } Chat chat = message.getChat(); - if (chat == null) { - return; - } - - String chatType = chat.getType(); - if (!Objects.equals(chatType, "private") - && !Objects.equals(chatType, "group") - && !Objects.equals(chatType, "supergroup") - && !Objects.equals(chatType, "channel")) { + if (chat == null || !isSupportedChatType(chat.getType())) { log.debug( - "Ignoring message {} in chat {} with unsupported chat type {}", + "Ignoring message {}, unsupported chat type {}", message.getMessageId(), - chat.getId(), - chatType); + chat != null ? chat.getType() : "null"); return; } - log.info( - "Received message {} in chat {} (type={}) message {}", - message.getMessageId(), - chat.getId(), - chatType, - message); + if (!isAuthorized(message, chat)) { + return; + } - if (telegramProperties.getEnableAllowUserIDs() - || telegramProperties.getEnableAllowChannelIDs()) { - List allowUserIDs = telegramProperties.getAllowUserIDs(); - List allowChannelIDs = telegramProperties.getAllowChannelIDs(); - switch (chatType) { - case "channel" -> { - // In channels, messages are always sent on behalf of the channel - if (telegramProperties.getEnableAllowChannelIDs()) { - Chat senderChat = message.getSenderChat(); - if ((senderChat == null || !allowChannelIDs.contains(senderChat.getId())) - && !allowChannelIDs.isEmpty()) { - log.info( - "Ignoring message {} from user id={} in private chat id={} due" - + " to channel access restrictions", - message.getMessageId(), - senderChat != null ? senderChat.getId() : "unknown", - chat.getId()); - sendMessage( - chat.getId(), - "This channel is not authorized to use this bot. Please contact" - + " the administrator."); - return; - } - if (allowChannelIDs.isEmpty()) { - // All channels are allowed, but log a warning - log.warn( - "No allowed channel IDs configured, allowing all channels" - + " access. Channel with id={} sent a message in chat" - + " id={}", - senderChat != null ? senderChat.getId() : "unknown", - chat.getId()); - } - } - } - case "private" -> { - // In private chats, messages are sent by users - if (telegramProperties.getEnableAllowUserIDs()) { - User from = message.getFrom(); - if ((from == null || !allowUserIDs.contains(from.getId())) - && !allowUserIDs.isEmpty()) { - log.info( - "Ignoring message {} from channel id={} due to user access" - + " restrictions", - message.getMessageId(), - chat.getId()); - sendMessage( - chat.getId(), - "You are not authorized to use this bot. Please contact the" - + " administrator."); - return; - } - if (allowUserIDs.isEmpty()) { - // All users are allowed, but log a warning - log.warn( - "No allowed user IDs configured, allowing all users access." - + " User with id={} sent a message in private chat id={}", - from != null ? from.getId() : "unknown", - chat.getId()); - } - } - } - case "group", "supergroup" -> { - // group chats - } - default -> { - // should not reach here due to earlier chatType check - } + if (update.hasMessage() && update.getMessage().hasText()) { + String message_text = update.getMessage().getText(); + long chat_id = update.getMessage().getChatId(); + if (message_text.equals("/start")) { + sendMessage( + chat_id, + "Welcome to the SPDF Telegram Bot!\n\n" + + "To get started, please send me a PDF document that you would like to process." + + " Make sure the document is in PDF format.\n\n" + + "Once I receive your document, I'll begin processing it through the pipeline."); + return; } } @@ -190,11 +122,312 @@ public class TelegramPipelineBot extends TelegramLongPollingBot { handleIncomingFile(message); return; } + sendMessage( chat.getId(), "No valid file found in the message. Please send a document to process."); } + // --------------------------- + // Message Extraction / Chat Type + // --------------------------- + + private Message extractMessage(Update update) { + if (update.hasMessage()) return update.getMessage(); + if (update.hasChannelPost()) return update.getChannelPost(); + return null; + } + + private boolean isSupportedChatType(String type) { + return CHAT_PRIVATE.equals(type) + || CHAT_GROUP.equals(type) + || CHAT_SUPERGROUP.equals(type) + || CHAT_CHANNEL.equals(type); + } + + // --------------------------- + // Authorization + // --------------------------- + + private boolean isAuthorized(Message message, Chat chat) { + if (!(telegramProperties.getEnableAllowUserIDs() + || telegramProperties.getEnableAllowChannelIDs())) { + return true; + } + + return switch (chat.getType()) { + case CHAT_CHANNEL -> checkChannelAccess(message, chat); + case CHAT_PRIVATE -> checkUserAccess(message, chat); + case CHAT_GROUP, CHAT_SUPERGROUP -> true; // groups allowed by default + default -> false; + }; + } + + private boolean checkUserAccess(Message message, Chat chat) { + if (!telegramProperties.getEnableAllowUserIDs()) return true; + + User from = message.getFrom(); + List allow = telegramProperties.getAllowUserIDs(); + + if (allow.isEmpty()) { + log.warn("No allowed user IDs configured - allowing all users."); + return true; + } + + if (from == null || !allow.contains(from.getId())) { + log.info( + "Rejecting user {} in private chat {}", + from != null ? from.getId() : "unknown", + chat.getId()); + sendMessage(chat.getId(), "You are not authorized to use this bot."); + return false; + } + + return true; + } + + private boolean checkChannelAccess(Message message, Chat chat) { + if (!telegramProperties.getEnableAllowChannelIDs()) return true; + + Chat senderChat = message.getSenderChat(); + List allow = telegramProperties.getAllowChannelIDs(); + + if (allow.isEmpty()) { + log.warn("No allowed channel IDs configured - allowing all channels."); + return true; + } + + if (senderChat == null || !allow.contains(senderChat.getId())) { + log.info( + "Rejecting channel {} in chat {}", + senderChat != null ? senderChat.getId() : "unknown", + chat.getId()); + + sendMessage(chat.getId(), "This channel is not authorized to use this bot."); + return false; + } + + return true; + } + + // --------------------------- + // File Handling + // --------------------------- + + private void handleIncomingFile(Message message) { + Long chatId = message.getChatId(); + Document doc = message.getDocument(); + + if (doc == null) { + sendMessage(chatId, "No document found."); + return; + } + + if (doc.getMimeType() != null + && !ALLOWED_MIME_TYPES.contains(doc.getMimeType().toLowerCase())) { + sendMessage( + chatId, + "Unsupported MIME type: " + + doc.getMimeType() + + "\nAllowed: " + + String.join(", ", ALLOWED_MIME_TYPES)); + return; + } + + if (!hasJsonConfig(chatId)) { + sendMessage( + chatId, + "No JSON configuration file found in the pipeline inbox folder. Please contact the administrator."); + return; + } + + try { + if (!CHAT_CHANNEL.equals(message.getChat().getType())) { + sendMessage(chatId, "File received. Starting processing..."); + } + + PipelineFileInfo info = downloadMessageFile(message); + List outputs = waitForPipelineOutputs(info); + + if (outputs.isEmpty()) { + sendMessage( + chatId, + "No results were found in the pipeline output folder. Check" + + " configuration."); + return; + } + + for (Path file : outputs) { + SendDocument out = new SendDocument(); + out.setChatId(chatId); + out.setDocument(new InputFile(file.toFile(), file.getFileName().toString())); + execute(out); + } + + } catch (TelegramApiException e) { + log.error("Telegram API error", e); + sendMessage(chatId, "Telegram API error occurred."); + } catch (IOException e) { + log.error("IO error", e); + sendMessage(chatId, "An IO error occurred."); + } catch (Exception e) { + log.error("Unexpected error", e); + sendMessage(chatId, "Unexpected error occurred."); + } + } + + private PipelineFileInfo downloadMessageFile(Message message) + throws TelegramApiException, IOException { + Document document = message.getDocument(); + String filename = document.getFileName(); + String name = + StringUtils.isNotBlank(filename) ? filename : document.getFileUniqueId() + ".bin"; + + return downloadFile(document.getFileId(), name, message); + } + + private PipelineFileInfo downloadFile(String fileId, String originalName, Message message) + throws TelegramApiException, IOException { + + Long chatId = message.getChatId(); + + Path inboxFolder = getInboxFolder(chatId); + + GetFile getFile = new GetFile(fileId); + File tgFile = execute(getFile); + + if (tgFile == null || StringUtils.isBlank(tgFile.getFilePath())) { + throw new IOException("Telegram did not return a file path."); + } + + URL url = buildDownloadUrl(tgFile.getFilePath()); + + String base = FilenameUtils.getBaseName(originalName) + "-" + UUID.randomUUID(); + String ext = FilenameUtils.getExtension(originalName); + String outFile = ext.isBlank() ? base : base + "." + ext; + + Path targetFile = inboxFolder.resolve(outFile); + + try (InputStream in = url.openStream()) { + Files.copy(in, targetFile); + } + + log.info("Saved Telegram file {} to {}", originalName, targetFile); + return new PipelineFileInfo(targetFile, base, Instant.now()); + } + + private URL buildDownloadUrl(String filePath) throws MalformedURLException { + return URI.create("https://api.telegram.org/file/bot" + getBotToken() + "/" + filePath) + .toURL(); + } + + // --------------------------- + // Inbox-Ordner & JSON-Check + // --------------------------- + + private Path getInboxFolder(Long chatId) throws IOException { + Path baseInbox = + Paths.get( + runtimePathConfig.getPipelineWatchedFoldersPath(), + telegramProperties.getPipelineInboxFolder()); + + Files.createDirectories(baseInbox); + + Path inboxFolder = + telegramProperties.getCustomFolderSuffix() + ? baseInbox.resolve(chatId.toString()) + : baseInbox; + + Files.createDirectories(inboxFolder); + + return inboxFolder; + } + + private boolean hasJsonConfig(Long chatId) { + try { + Path inboxFolder = getInboxFolder(chatId); + try (Stream s = Files.list(inboxFolder)) { + return s.anyMatch(p -> p.toString().endsWith(".json")); + } + } catch (IOException e) { + log.error("Failed to check JSON config for chat {}", chatId, e); + return false; + } + } + + // --------------------------- + // Pipeline polling + // --------------------------- + + private List waitForPipelineOutputs(PipelineFileInfo info) throws IOException { + + Path finishedDir = Paths.get(runtimePathConfig.getPipelineFinishedFoldersPath()); + Files.createDirectories(finishedDir); + + Instant start = info.savedAt(); + Duration timeout = Duration.ofSeconds(telegramProperties.getProcessingTimeoutSeconds()); + Duration poll = Duration.ofMillis(telegramProperties.getPollingIntervalMillis()); + List results = new ArrayList<>(); + + while (Duration.between(start, Instant.now()).compareTo(timeout) <= 0) { + try (Stream s = Files.walk(finishedDir, 1)) { + results = + s.filter(Files::isRegularFile) + .filter(path -> matchesBaseName(info.uniqueBaseName(), path)) + .filter(path -> isNewerThan(path, start)) + .sorted(Comparator.comparing(Path::toString)) + .toList(); + } + + if (!results.isEmpty()) { + break; + } + + synchronized (pipelinePollMonitor) { + try { + pipelinePollMonitor.wait(poll.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + + return results; + } + + private boolean matchesBaseName(String base, Path file) { + return file.getFileName().toString().contains(base); + } + + private boolean isNewerThan(Path path, Instant since) { + try { + return Files.getLastModifiedTime(path).toInstant().isAfter(since); + } catch (IOException e) { + log.debug("Could not read modification time for {}", path); + return false; + } + } + + // --------------------------- + // Messaging + // --------------------------- + + private void sendMessage(Long chatId, String text) { + if (chatId == null) return; + + SendMessage msg = new SendMessage(); + msg.setChatId(chatId); + msg.setText(text); + try { + execute(msg); + } catch (TelegramApiException e) { + log.warn("Failed to send message to {}", chatId, e); + } + } + + private record PipelineFileInfo(Path originalFile, String uniqueBaseName, Instant savedAt) {} + @Override public String getBotUsername() { return telegramProperties.getBotUsername(); @@ -204,204 +437,4 @@ public class TelegramPipelineBot extends TelegramLongPollingBot { public String getBotToken() { return telegramProperties.getBotToken(); } - - private void handleIncomingFile(Message message) { - Long chatId = message.getChatId(); - String chatType = message.getChat() != null ? message.getChat().getType() : null; - String[] allowedMimeTypes = {"application/pdf"}; - Document document = message.getDocument(); - if (document != null) { - String mimeType = document.getMimeType(); - if (mimeType != null && !List.of(allowedMimeTypes).contains(mimeType.toLowerCase())) { - sendMessage( - message.getChatId(), - String.format( - "File mime type %s is not allowed. Allowed types are: %s", - mimeType, String.join(", ", allowedMimeTypes))); - return; - } - } - try { - // Only send status messages in private chats and groups, not in channels - if (!Objects.equals(chatType, "channel")) { - sendMessage(chatId, "File received. Starting processing in pipeline folder..."); - } - - PipelineFileInfo fileInfo = downloadMessageFile(message); - List outputs = waitForPipelineOutputs(fileInfo); - - if (outputs.isEmpty()) { - sendMessage( - chatId, - "No results were found in the pipeline finished folder. Please check your" - + " pipeline configuration."); - return; - } - - for (Path output : outputs) { - SendDocument sendDocument = new SendDocument(); - sendDocument.setChatId(chatId); - sendDocument.setDocument( - new InputFile(output.toFile(), output.getFileName().toString())); - execute(sendDocument); - } - - } catch (TelegramApiException e) { - log.error("Telegram API error while processing message {}", message.getMessageId(), e); - sendMessage(chatId, "Error during processing: Telegram API error."); - } catch (IOException e) { - log.error("IO error while processing message {}", message.getMessageId(), e); - sendMessage(chatId, "Error during processing: An IO error occurred."); - } catch (Exception e) { - log.error("Unexpected error while processing message {}", message.getMessageId(), e); - sendMessage(chatId, "Error during processing: An unexpected error occurred."); - } - } - - private PipelineFileInfo downloadMessageFile(Message message) - throws TelegramApiException, IOException { - if (message.hasDocument()) { - return downloadDocument(message); - } - throw new IllegalArgumentException("Unsupported file type"); - } - - private PipelineFileInfo downloadDocument(Message message) - throws TelegramApiException, IOException { - Document document = message.getDocument(); - String filename = document.getFileName(); - String name = - StringUtils.isNotBlank(filename) ? filename : document.getFileUniqueId() + ".bin"; - return downloadFile(document.getFileId(), name, message); - } - - private PipelineFileInfo downloadFile(String fileId, String originalName, Message message) - throws TelegramApiException, IOException { - - GetFile getFile = new GetFile(fileId); - File telegramFile = execute(getFile); - - if (telegramFile == null || StringUtils.isBlank(telegramFile.getFilePath())) { - throw new IOException("Telegram did not return a valid file path"); - } - - URL downloadUrl = buildDownloadUrl(telegramFile.getFilePath()); - - Long chatId = message.getChat() != null ? message.getChat().getId() : null; - - Path baseInbox = - Paths.get( - runtimePathConfig.getPipelineWatchedFoldersPath(), - telegramProperties.getPipelineInboxFolder()); - - Files.createDirectories(baseInbox); - - Path inboxFolder = baseInbox; - if (telegramProperties.getCustomFolderSuffix() && chatId != null) { - inboxFolder = baseInbox.resolve(chatId.toString()); - } - - Files.createDirectories(inboxFolder); - - boolean hasJsonConfig = - Files.list(inboxFolder) - .filter(Files::isRegularFile) - .anyMatch(p -> p.toString().endsWith(".json")); - - if (!hasJsonConfig) { - log.info("No JSON configuration file found in inbox folder {}", inboxFolder); - sendMessage( - chatId, - "No JSON configuration file found in the inbox folder. Please contact the administrator."); - } - - String uniqueBaseName = FilenameUtils.getBaseName(originalName) + "-" + UUID.randomUUID(); - String extension = FilenameUtils.getExtension(originalName); - - String targetFilename = - extension.isBlank() ? uniqueBaseName : uniqueBaseName + "." + extension; - - Path targetFile = inboxFolder.resolve(targetFilename); - - try (InputStream inputStream = downloadUrl.openStream()) { - Files.copy(inputStream, targetFile); - } - - log.info("Saved Telegram file {} to {}", originalName, targetFile); - return new PipelineFileInfo(targetFile, uniqueBaseName, Instant.now()); - } - - private URL buildDownloadUrl(String filePath) throws MalformedURLException { - return URI.create( - String.format( - "https://api.telegram.org/file/bot%s/%s", getBotToken(), filePath)) - .toURL(); - } - - private List waitForPipelineOutputs(PipelineFileInfo info) throws IOException { - - Path finishedDir = Paths.get(runtimePathConfig.getPipelineFinishedFoldersPath()); - Files.createDirectories(finishedDir); - - Instant start = info.savedAt(); - Duration timeout = Duration.ofSeconds(telegramProperties.getProcessingTimeoutSeconds()); - Duration pollInterval = Duration.ofMillis(telegramProperties.getPollingIntervalMillis()); - List foundOutputs = new ArrayList<>(); - - while (Duration.between(start, Instant.now()).compareTo(timeout) <= 0) { - try (Stream files = Files.walk(finishedDir, 1)) { - foundOutputs = - files.filter(Files::isRegularFile) - .filter(path -> matchesBaseName(info.uniqueBaseName(), path)) - .filter(path -> isNewerThan(path, start)) - .sorted(Comparator.comparing(Path::toString)) - .toList(); - } - - if (!foundOutputs.isEmpty()) { - break; - } - - synchronized (this) { - try { - wait(pollInterval.toMillis()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - break; - } - } - } - - return foundOutputs; - } - - private boolean matchesBaseName(String baseName, Path path) { - return path.getFileName().toString().contains(baseName); - } - - private boolean isNewerThan(Path path, Instant instant) { - try { - FileTime modifiedTime = Files.getLastModifiedTime(path); - return modifiedTime.toInstant().isAfter(instant); - } catch (IOException e) { - log.debug("Could not read modification time for {}", path, e); - return false; - } - } - - private void sendMessage(Long chatId, String text) { - if (chatId == null) { - return; - } - SendMessage message = new SendMessage(); - message.setChatId(chatId); - message.setText(text); - try { - execute(message); - } catch (TelegramApiException e) { - log.warn("Failed to send Telegram message to {}", chatId, e); - } - } - - private record PipelineFileInfo(Path originalFile, String uniqueBaseName, Instant savedAt) {} }