mirror of
https://github.com/Frooodle/Stirling-PDF.git
synced 2026-01-14 20:11:17 +01:00
Refactor TelegramPipelineBot for clarity and modularity
Refactored TelegramPipelineBot to modularize chat type handling, authorization, and file processing logic. Introduced helper methods for message extraction, chat type checks, and authorization, and improved error handling and logging. Simplified file download and pipeline polling, and improved code readability and maintainability.
This commit is contained in:
parent
8aa2d59583
commit
b584b29097
@ -8,13 +8,11 @@ import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.attribute.FileTime;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -48,6 +46,15 @@ import stirling.software.common.model.ApplicationProperties;
|
||||
@ConditionalOnProperty(prefix = "telegram", name = "enabled", havingValue = "true")
|
||||
public class TelegramPipelineBot extends TelegramLongPollingBot {
|
||||
|
||||
private static final String CHAT_PRIVATE = "private";
|
||||
private static final String CHAT_GROUP = "group";
|
||||
private static final String CHAT_SUPERGROUP = "supergroup";
|
||||
private static final String CHAT_CHANNEL = "channel";
|
||||
|
||||
private static final List<String> ALLOWED_MIME_TYPES = List.of("application/pdf");
|
||||
|
||||
private final Object pipelinePollMonitor = new Object();
|
||||
|
||||
private final ApplicationProperties.Telegram telegramProperties;
|
||||
private final RuntimePathConfig runtimePathConfig;
|
||||
private final TelegramBotsApi telegramBotsApi;
|
||||
@ -56,6 +63,7 @@ public class TelegramPipelineBot extends TelegramLongPollingBot {
|
||||
ApplicationProperties applicationProperties,
|
||||
RuntimePathConfig runtimePathConfig,
|
||||
TelegramBotsApi telegramBotsApi) {
|
||||
|
||||
super(applicationProperties.getTelegram().getBotToken());
|
||||
this.telegramProperties = applicationProperties.getTelegram();
|
||||
this.runtimePathConfig = runtimePathConfig;
|
||||
@ -78,111 +86,35 @@ public class TelegramPipelineBot extends TelegramLongPollingBot {
|
||||
|
||||
@Override
|
||||
public void onUpdateReceived(Update update) {
|
||||
Message message = null;
|
||||
|
||||
// 1) Regular messages
|
||||
if (update.hasMessage()) {
|
||||
message = update.getMessage();
|
||||
}
|
||||
// 2) Channel posts
|
||||
else if (update.hasChannelPost()) {
|
||||
message = update.getChannelPost();
|
||||
} else {
|
||||
Message message = extractMessage(update);
|
||||
if (message == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Chat chat = message.getChat();
|
||||
if (chat == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String chatType = chat.getType();
|
||||
if (!Objects.equals(chatType, "private")
|
||||
&& !Objects.equals(chatType, "group")
|
||||
&& !Objects.equals(chatType, "supergroup")
|
||||
&& !Objects.equals(chatType, "channel")) {
|
||||
if (chat == null || !isSupportedChatType(chat.getType())) {
|
||||
log.debug(
|
||||
"Ignoring message {} in chat {} with unsupported chat type {}",
|
||||
"Ignoring message {}, unsupported chat type {}",
|
||||
message.getMessageId(),
|
||||
chat.getId(),
|
||||
chatType);
|
||||
chat != null ? chat.getType() : "null");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info(
|
||||
"Received message {} in chat {} (type={}) message {}",
|
||||
message.getMessageId(),
|
||||
chat.getId(),
|
||||
chatType,
|
||||
message);
|
||||
if (!isAuthorized(message, chat)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (telegramProperties.getEnableAllowUserIDs()
|
||||
|| telegramProperties.getEnableAllowChannelIDs()) {
|
||||
List<Long> allowUserIDs = telegramProperties.getAllowUserIDs();
|
||||
List<Long> allowChannelIDs = telegramProperties.getAllowChannelIDs();
|
||||
switch (chatType) {
|
||||
case "channel" -> {
|
||||
// In channels, messages are always sent on behalf of the channel
|
||||
if (telegramProperties.getEnableAllowChannelIDs()) {
|
||||
Chat senderChat = message.getSenderChat();
|
||||
if ((senderChat == null || !allowChannelIDs.contains(senderChat.getId()))
|
||||
&& !allowChannelIDs.isEmpty()) {
|
||||
log.info(
|
||||
"Ignoring message {} from user id={} in private chat id={} due"
|
||||
+ " to channel access restrictions",
|
||||
message.getMessageId(),
|
||||
senderChat != null ? senderChat.getId() : "unknown",
|
||||
chat.getId());
|
||||
sendMessage(
|
||||
chat.getId(),
|
||||
"This channel is not authorized to use this bot. Please contact"
|
||||
+ " the administrator.");
|
||||
return;
|
||||
}
|
||||
if (allowChannelIDs.isEmpty()) {
|
||||
// All channels are allowed, but log a warning
|
||||
log.warn(
|
||||
"No allowed channel IDs configured, allowing all channels"
|
||||
+ " access. Channel with id={} sent a message in chat"
|
||||
+ " id={}",
|
||||
senderChat != null ? senderChat.getId() : "unknown",
|
||||
chat.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
case "private" -> {
|
||||
// In private chats, messages are sent by users
|
||||
if (telegramProperties.getEnableAllowUserIDs()) {
|
||||
User from = message.getFrom();
|
||||
if ((from == null || !allowUserIDs.contains(from.getId()))
|
||||
&& !allowUserIDs.isEmpty()) {
|
||||
log.info(
|
||||
"Ignoring message {} from channel id={} due to user access"
|
||||
+ " restrictions",
|
||||
message.getMessageId(),
|
||||
chat.getId());
|
||||
sendMessage(
|
||||
chat.getId(),
|
||||
"You are not authorized to use this bot. Please contact the"
|
||||
+ " administrator.");
|
||||
return;
|
||||
}
|
||||
if (allowUserIDs.isEmpty()) {
|
||||
// All users are allowed, but log a warning
|
||||
log.warn(
|
||||
"No allowed user IDs configured, allowing all users access."
|
||||
+ " User with id={} sent a message in private chat id={}",
|
||||
from != null ? from.getId() : "unknown",
|
||||
chat.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
case "group", "supergroup" -> {
|
||||
// group chats
|
||||
}
|
||||
default -> {
|
||||
// should not reach here due to earlier chatType check
|
||||
}
|
||||
if (update.hasMessage() && update.getMessage().hasText()) {
|
||||
String message_text = update.getMessage().getText();
|
||||
long chat_id = update.getMessage().getChatId();
|
||||
if (message_text.equals("/start")) {
|
||||
sendMessage(
|
||||
chat_id,
|
||||
"Welcome to the SPDF Telegram Bot!\n\n"
|
||||
+ "To get started, please send me a PDF document that you would like to process."
|
||||
+ " Make sure the document is in PDF format.\n\n"
|
||||
+ "Once I receive your document, I'll begin processing it through the pipeline.");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@ -190,11 +122,312 @@ public class TelegramPipelineBot extends TelegramLongPollingBot {
|
||||
handleIncomingFile(message);
|
||||
return;
|
||||
}
|
||||
|
||||
sendMessage(
|
||||
chat.getId(),
|
||||
"No valid file found in the message. Please send a document to process.");
|
||||
}
|
||||
|
||||
// ---------------------------
|
||||
// Message Extraction / Chat Type
|
||||
// ---------------------------
|
||||
|
||||
private Message extractMessage(Update update) {
|
||||
if (update.hasMessage()) return update.getMessage();
|
||||
if (update.hasChannelPost()) return update.getChannelPost();
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean isSupportedChatType(String type) {
|
||||
return CHAT_PRIVATE.equals(type)
|
||||
|| CHAT_GROUP.equals(type)
|
||||
|| CHAT_SUPERGROUP.equals(type)
|
||||
|| CHAT_CHANNEL.equals(type);
|
||||
}
|
||||
|
||||
// ---------------------------
|
||||
// Authorization
|
||||
// ---------------------------
|
||||
|
||||
private boolean isAuthorized(Message message, Chat chat) {
|
||||
if (!(telegramProperties.getEnableAllowUserIDs()
|
||||
|| telegramProperties.getEnableAllowChannelIDs())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return switch (chat.getType()) {
|
||||
case CHAT_CHANNEL -> checkChannelAccess(message, chat);
|
||||
case CHAT_PRIVATE -> checkUserAccess(message, chat);
|
||||
case CHAT_GROUP, CHAT_SUPERGROUP -> true; // groups allowed by default
|
||||
default -> false;
|
||||
};
|
||||
}
|
||||
|
||||
private boolean checkUserAccess(Message message, Chat chat) {
|
||||
if (!telegramProperties.getEnableAllowUserIDs()) return true;
|
||||
|
||||
User from = message.getFrom();
|
||||
List<Long> allow = telegramProperties.getAllowUserIDs();
|
||||
|
||||
if (allow.isEmpty()) {
|
||||
log.warn("No allowed user IDs configured - allowing all users.");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (from == null || !allow.contains(from.getId())) {
|
||||
log.info(
|
||||
"Rejecting user {} in private chat {}",
|
||||
from != null ? from.getId() : "unknown",
|
||||
chat.getId());
|
||||
sendMessage(chat.getId(), "You are not authorized to use this bot.");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean checkChannelAccess(Message message, Chat chat) {
|
||||
if (!telegramProperties.getEnableAllowChannelIDs()) return true;
|
||||
|
||||
Chat senderChat = message.getSenderChat();
|
||||
List<Long> allow = telegramProperties.getAllowChannelIDs();
|
||||
|
||||
if (allow.isEmpty()) {
|
||||
log.warn("No allowed channel IDs configured - allowing all channels.");
|
||||
return true;
|
||||
}
|
||||
|
||||
if (senderChat == null || !allow.contains(senderChat.getId())) {
|
||||
log.info(
|
||||
"Rejecting channel {} in chat {}",
|
||||
senderChat != null ? senderChat.getId() : "unknown",
|
||||
chat.getId());
|
||||
|
||||
sendMessage(chat.getId(), "This channel is not authorized to use this bot.");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// ---------------------------
|
||||
// File Handling
|
||||
// ---------------------------
|
||||
|
||||
private void handleIncomingFile(Message message) {
|
||||
Long chatId = message.getChatId();
|
||||
Document doc = message.getDocument();
|
||||
|
||||
if (doc == null) {
|
||||
sendMessage(chatId, "No document found.");
|
||||
return;
|
||||
}
|
||||
|
||||
if (doc.getMimeType() != null
|
||||
&& !ALLOWED_MIME_TYPES.contains(doc.getMimeType().toLowerCase())) {
|
||||
sendMessage(
|
||||
chatId,
|
||||
"Unsupported MIME type: "
|
||||
+ doc.getMimeType()
|
||||
+ "\nAllowed: "
|
||||
+ String.join(", ", ALLOWED_MIME_TYPES));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!hasJsonConfig(chatId)) {
|
||||
sendMessage(
|
||||
chatId,
|
||||
"No JSON configuration file found in the pipeline inbox folder. Please contact the administrator.");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
if (!CHAT_CHANNEL.equals(message.getChat().getType())) {
|
||||
sendMessage(chatId, "File received. Starting processing...");
|
||||
}
|
||||
|
||||
PipelineFileInfo info = downloadMessageFile(message);
|
||||
List<Path> outputs = waitForPipelineOutputs(info);
|
||||
|
||||
if (outputs.isEmpty()) {
|
||||
sendMessage(
|
||||
chatId,
|
||||
"No results were found in the pipeline output folder. Check"
|
||||
+ " configuration.");
|
||||
return;
|
||||
}
|
||||
|
||||
for (Path file : outputs) {
|
||||
SendDocument out = new SendDocument();
|
||||
out.setChatId(chatId);
|
||||
out.setDocument(new InputFile(file.toFile(), file.getFileName().toString()));
|
||||
execute(out);
|
||||
}
|
||||
|
||||
} catch (TelegramApiException e) {
|
||||
log.error("Telegram API error", e);
|
||||
sendMessage(chatId, "Telegram API error occurred.");
|
||||
} catch (IOException e) {
|
||||
log.error("IO error", e);
|
||||
sendMessage(chatId, "An IO error occurred.");
|
||||
} catch (Exception e) {
|
||||
log.error("Unexpected error", e);
|
||||
sendMessage(chatId, "Unexpected error occurred.");
|
||||
}
|
||||
}
|
||||
|
||||
private PipelineFileInfo downloadMessageFile(Message message)
|
||||
throws TelegramApiException, IOException {
|
||||
Document document = message.getDocument();
|
||||
String filename = document.getFileName();
|
||||
String name =
|
||||
StringUtils.isNotBlank(filename) ? filename : document.getFileUniqueId() + ".bin";
|
||||
|
||||
return downloadFile(document.getFileId(), name, message);
|
||||
}
|
||||
|
||||
private PipelineFileInfo downloadFile(String fileId, String originalName, Message message)
|
||||
throws TelegramApiException, IOException {
|
||||
|
||||
Long chatId = message.getChatId();
|
||||
|
||||
Path inboxFolder = getInboxFolder(chatId);
|
||||
|
||||
GetFile getFile = new GetFile(fileId);
|
||||
File tgFile = execute(getFile);
|
||||
|
||||
if (tgFile == null || StringUtils.isBlank(tgFile.getFilePath())) {
|
||||
throw new IOException("Telegram did not return a file path.");
|
||||
}
|
||||
|
||||
URL url = buildDownloadUrl(tgFile.getFilePath());
|
||||
|
||||
String base = FilenameUtils.getBaseName(originalName) + "-" + UUID.randomUUID();
|
||||
String ext = FilenameUtils.getExtension(originalName);
|
||||
String outFile = ext.isBlank() ? base : base + "." + ext;
|
||||
|
||||
Path targetFile = inboxFolder.resolve(outFile);
|
||||
|
||||
try (InputStream in = url.openStream()) {
|
||||
Files.copy(in, targetFile);
|
||||
}
|
||||
|
||||
log.info("Saved Telegram file {} to {}", originalName, targetFile);
|
||||
return new PipelineFileInfo(targetFile, base, Instant.now());
|
||||
}
|
||||
|
||||
private URL buildDownloadUrl(String filePath) throws MalformedURLException {
|
||||
return URI.create("https://api.telegram.org/file/bot" + getBotToken() + "/" + filePath)
|
||||
.toURL();
|
||||
}
|
||||
|
||||
// ---------------------------
|
||||
// Inbox-Ordner & JSON-Check
|
||||
// ---------------------------
|
||||
|
||||
private Path getInboxFolder(Long chatId) throws IOException {
|
||||
Path baseInbox =
|
||||
Paths.get(
|
||||
runtimePathConfig.getPipelineWatchedFoldersPath(),
|
||||
telegramProperties.getPipelineInboxFolder());
|
||||
|
||||
Files.createDirectories(baseInbox);
|
||||
|
||||
Path inboxFolder =
|
||||
telegramProperties.getCustomFolderSuffix()
|
||||
? baseInbox.resolve(chatId.toString())
|
||||
: baseInbox;
|
||||
|
||||
Files.createDirectories(inboxFolder);
|
||||
|
||||
return inboxFolder;
|
||||
}
|
||||
|
||||
private boolean hasJsonConfig(Long chatId) {
|
||||
try {
|
||||
Path inboxFolder = getInboxFolder(chatId);
|
||||
try (Stream<Path> s = Files.list(inboxFolder)) {
|
||||
return s.anyMatch(p -> p.toString().endsWith(".json"));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to check JSON config for chat {}", chatId, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------
|
||||
// Pipeline polling
|
||||
// ---------------------------
|
||||
|
||||
private List<Path> waitForPipelineOutputs(PipelineFileInfo info) throws IOException {
|
||||
|
||||
Path finishedDir = Paths.get(runtimePathConfig.getPipelineFinishedFoldersPath());
|
||||
Files.createDirectories(finishedDir);
|
||||
|
||||
Instant start = info.savedAt();
|
||||
Duration timeout = Duration.ofSeconds(telegramProperties.getProcessingTimeoutSeconds());
|
||||
Duration poll = Duration.ofMillis(telegramProperties.getPollingIntervalMillis());
|
||||
List<Path> results = new ArrayList<>();
|
||||
|
||||
while (Duration.between(start, Instant.now()).compareTo(timeout) <= 0) {
|
||||
try (Stream<Path> s = Files.walk(finishedDir, 1)) {
|
||||
results =
|
||||
s.filter(Files::isRegularFile)
|
||||
.filter(path -> matchesBaseName(info.uniqueBaseName(), path))
|
||||
.filter(path -> isNewerThan(path, start))
|
||||
.sorted(Comparator.comparing(Path::toString))
|
||||
.toList();
|
||||
}
|
||||
|
||||
if (!results.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
synchronized (pipelinePollMonitor) {
|
||||
try {
|
||||
pipelinePollMonitor.wait(poll.toMillis());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
private boolean matchesBaseName(String base, Path file) {
|
||||
return file.getFileName().toString().contains(base);
|
||||
}
|
||||
|
||||
private boolean isNewerThan(Path path, Instant since) {
|
||||
try {
|
||||
return Files.getLastModifiedTime(path).toInstant().isAfter(since);
|
||||
} catch (IOException e) {
|
||||
log.debug("Could not read modification time for {}", path);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------
|
||||
// Messaging
|
||||
// ---------------------------
|
||||
|
||||
private void sendMessage(Long chatId, String text) {
|
||||
if (chatId == null) return;
|
||||
|
||||
SendMessage msg = new SendMessage();
|
||||
msg.setChatId(chatId);
|
||||
msg.setText(text);
|
||||
try {
|
||||
execute(msg);
|
||||
} catch (TelegramApiException e) {
|
||||
log.warn("Failed to send message to {}", chatId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private record PipelineFileInfo(Path originalFile, String uniqueBaseName, Instant savedAt) {}
|
||||
|
||||
@Override
|
||||
public String getBotUsername() {
|
||||
return telegramProperties.getBotUsername();
|
||||
@ -204,204 +437,4 @@ public class TelegramPipelineBot extends TelegramLongPollingBot {
|
||||
public String getBotToken() {
|
||||
return telegramProperties.getBotToken();
|
||||
}
|
||||
|
||||
private void handleIncomingFile(Message message) {
|
||||
Long chatId = message.getChatId();
|
||||
String chatType = message.getChat() != null ? message.getChat().getType() : null;
|
||||
String[] allowedMimeTypes = {"application/pdf"};
|
||||
Document document = message.getDocument();
|
||||
if (document != null) {
|
||||
String mimeType = document.getMimeType();
|
||||
if (mimeType != null && !List.of(allowedMimeTypes).contains(mimeType.toLowerCase())) {
|
||||
sendMessage(
|
||||
message.getChatId(),
|
||||
String.format(
|
||||
"File mime type %s is not allowed. Allowed types are: %s",
|
||||
mimeType, String.join(", ", allowedMimeTypes)));
|
||||
return;
|
||||
}
|
||||
}
|
||||
try {
|
||||
// Only send status messages in private chats and groups, not in channels
|
||||
if (!Objects.equals(chatType, "channel")) {
|
||||
sendMessage(chatId, "File received. Starting processing in pipeline folder...");
|
||||
}
|
||||
|
||||
PipelineFileInfo fileInfo = downloadMessageFile(message);
|
||||
List<Path> outputs = waitForPipelineOutputs(fileInfo);
|
||||
|
||||
if (outputs.isEmpty()) {
|
||||
sendMessage(
|
||||
chatId,
|
||||
"No results were found in the pipeline finished folder. Please check your"
|
||||
+ " pipeline configuration.");
|
||||
return;
|
||||
}
|
||||
|
||||
for (Path output : outputs) {
|
||||
SendDocument sendDocument = new SendDocument();
|
||||
sendDocument.setChatId(chatId);
|
||||
sendDocument.setDocument(
|
||||
new InputFile(output.toFile(), output.getFileName().toString()));
|
||||
execute(sendDocument);
|
||||
}
|
||||
|
||||
} catch (TelegramApiException e) {
|
||||
log.error("Telegram API error while processing message {}", message.getMessageId(), e);
|
||||
sendMessage(chatId, "Error during processing: Telegram API error.");
|
||||
} catch (IOException e) {
|
||||
log.error("IO error while processing message {}", message.getMessageId(), e);
|
||||
sendMessage(chatId, "Error during processing: An IO error occurred.");
|
||||
} catch (Exception e) {
|
||||
log.error("Unexpected error while processing message {}", message.getMessageId(), e);
|
||||
sendMessage(chatId, "Error during processing: An unexpected error occurred.");
|
||||
}
|
||||
}
|
||||
|
||||
private PipelineFileInfo downloadMessageFile(Message message)
|
||||
throws TelegramApiException, IOException {
|
||||
if (message.hasDocument()) {
|
||||
return downloadDocument(message);
|
||||
}
|
||||
throw new IllegalArgumentException("Unsupported file type");
|
||||
}
|
||||
|
||||
private PipelineFileInfo downloadDocument(Message message)
|
||||
throws TelegramApiException, IOException {
|
||||
Document document = message.getDocument();
|
||||
String filename = document.getFileName();
|
||||
String name =
|
||||
StringUtils.isNotBlank(filename) ? filename : document.getFileUniqueId() + ".bin";
|
||||
return downloadFile(document.getFileId(), name, message);
|
||||
}
|
||||
|
||||
private PipelineFileInfo downloadFile(String fileId, String originalName, Message message)
|
||||
throws TelegramApiException, IOException {
|
||||
|
||||
GetFile getFile = new GetFile(fileId);
|
||||
File telegramFile = execute(getFile);
|
||||
|
||||
if (telegramFile == null || StringUtils.isBlank(telegramFile.getFilePath())) {
|
||||
throw new IOException("Telegram did not return a valid file path");
|
||||
}
|
||||
|
||||
URL downloadUrl = buildDownloadUrl(telegramFile.getFilePath());
|
||||
|
||||
Long chatId = message.getChat() != null ? message.getChat().getId() : null;
|
||||
|
||||
Path baseInbox =
|
||||
Paths.get(
|
||||
runtimePathConfig.getPipelineWatchedFoldersPath(),
|
||||
telegramProperties.getPipelineInboxFolder());
|
||||
|
||||
Files.createDirectories(baseInbox);
|
||||
|
||||
Path inboxFolder = baseInbox;
|
||||
if (telegramProperties.getCustomFolderSuffix() && chatId != null) {
|
||||
inboxFolder = baseInbox.resolve(chatId.toString());
|
||||
}
|
||||
|
||||
Files.createDirectories(inboxFolder);
|
||||
|
||||
boolean hasJsonConfig =
|
||||
Files.list(inboxFolder)
|
||||
.filter(Files::isRegularFile)
|
||||
.anyMatch(p -> p.toString().endsWith(".json"));
|
||||
|
||||
if (!hasJsonConfig) {
|
||||
log.info("No JSON configuration file found in inbox folder {}", inboxFolder);
|
||||
sendMessage(
|
||||
chatId,
|
||||
"No JSON configuration file found in the inbox folder. Please contact the administrator.");
|
||||
}
|
||||
|
||||
String uniqueBaseName = FilenameUtils.getBaseName(originalName) + "-" + UUID.randomUUID();
|
||||
String extension = FilenameUtils.getExtension(originalName);
|
||||
|
||||
String targetFilename =
|
||||
extension.isBlank() ? uniqueBaseName : uniqueBaseName + "." + extension;
|
||||
|
||||
Path targetFile = inboxFolder.resolve(targetFilename);
|
||||
|
||||
try (InputStream inputStream = downloadUrl.openStream()) {
|
||||
Files.copy(inputStream, targetFile);
|
||||
}
|
||||
|
||||
log.info("Saved Telegram file {} to {}", originalName, targetFile);
|
||||
return new PipelineFileInfo(targetFile, uniqueBaseName, Instant.now());
|
||||
}
|
||||
|
||||
private URL buildDownloadUrl(String filePath) throws MalformedURLException {
|
||||
return URI.create(
|
||||
String.format(
|
||||
"https://api.telegram.org/file/bot%s/%s", getBotToken(), filePath))
|
||||
.toURL();
|
||||
}
|
||||
|
||||
private List<Path> waitForPipelineOutputs(PipelineFileInfo info) throws IOException {
|
||||
|
||||
Path finishedDir = Paths.get(runtimePathConfig.getPipelineFinishedFoldersPath());
|
||||
Files.createDirectories(finishedDir);
|
||||
|
||||
Instant start = info.savedAt();
|
||||
Duration timeout = Duration.ofSeconds(telegramProperties.getProcessingTimeoutSeconds());
|
||||
Duration pollInterval = Duration.ofMillis(telegramProperties.getPollingIntervalMillis());
|
||||
List<Path> foundOutputs = new ArrayList<>();
|
||||
|
||||
while (Duration.between(start, Instant.now()).compareTo(timeout) <= 0) {
|
||||
try (Stream<Path> files = Files.walk(finishedDir, 1)) {
|
||||
foundOutputs =
|
||||
files.filter(Files::isRegularFile)
|
||||
.filter(path -> matchesBaseName(info.uniqueBaseName(), path))
|
||||
.filter(path -> isNewerThan(path, start))
|
||||
.sorted(Comparator.comparing(Path::toString))
|
||||
.toList();
|
||||
}
|
||||
|
||||
if (!foundOutputs.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
try {
|
||||
wait(pollInterval.toMillis());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return foundOutputs;
|
||||
}
|
||||
|
||||
private boolean matchesBaseName(String baseName, Path path) {
|
||||
return path.getFileName().toString().contains(baseName);
|
||||
}
|
||||
|
||||
private boolean isNewerThan(Path path, Instant instant) {
|
||||
try {
|
||||
FileTime modifiedTime = Files.getLastModifiedTime(path);
|
||||
return modifiedTime.toInstant().isAfter(instant);
|
||||
} catch (IOException e) {
|
||||
log.debug("Could not read modification time for {}", path, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private void sendMessage(Long chatId, String text) {
|
||||
if (chatId == null) {
|
||||
return;
|
||||
}
|
||||
SendMessage message = new SendMessage();
|
||||
message.setChatId(chatId);
|
||||
message.setText(text);
|
||||
try {
|
||||
execute(message);
|
||||
} catch (TelegramApiException e) {
|
||||
log.warn("Failed to send Telegram message to {}", chatId, e);
|
||||
}
|
||||
}
|
||||
|
||||
private record PipelineFileInfo(Path originalFile, String uniqueBaseName, Instant savedAt) {}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user