diff --git a/mailToTelegramForwarder.py b/mailToTelegramForwarder.py index 490ab3c..949c67f 100644 --- a/mailToTelegramForwarder.py +++ b/mailToTelegramForwarder.py @@ -18,6 +18,7 @@ """ try: import warnings + import typing import sys import re import unicodedata @@ -46,7 +47,7 @@ """ __appname__ = "Mail to Telegram Forwarder" -__version__ = "0.2.0" +__version__ = "0.2.1" __author__ = "Awalon (https://github.com/awalon)" with warnings.catch_warnings(record=True) as w: @@ -63,6 +64,15 @@ class Tool: + mask_error_data: [str] = [] + + def decode_mail_data(self, value) -> str: + result = '' + for msg_part in email.header.decode_header(value): + part, encoding = msg_part + result += self.binary_to_string(part, encoding=encoding) + return result + @staticmethod def binary_to_string(value, **kwargs) -> str: encoding = kwargs.get('encoding') @@ -77,31 +87,43 @@ def binary_to_string(value, **kwargs) -> str: else: return str(value) - @staticmethod - def build_error_message(message): - error_message = message + def _convert_error_message(self, message) -> str: + error_message: str = message + if type(message) is bytes: + error_message = self.binary_to_string(message) if type(message) is not str: error_message = '%s' % message # ', '.join(map(str, message.args)) try: exc_type, exc_obj, tb = sys.exc_info() - line_no = '' - obj_name = '' - file_name = '' if tb is not None: frame = tb.tb_frame line_no = tb.tb_lineno obj_name = frame.f_code.co_name file_name = frame.f_code.co_filename - trace_msg = " [%s:%s in '%s']" % (file_name, line_no, obj_name) - error_message = '%s%s' % (error_message, trace_msg) + trace_msg = " [%s:%s in '%s']" % (file_name, line_no, obj_name) + error_message = '%s%s' % (error_message, trace_msg) except Exception as ex: logging.error('Fatal in "build_error_message": %s' % str(ex)) logging.error('--- initial error: "%s"' % message) return error_message + def build_error_message(self, message) -> str: + error_message: str + if type(message) is list: + lines: [str] = [] + for item in message: + lines.append(self._convert_error_message(item)) + error_message = "; ".join(lines) + else: + error_message = self._convert_error_message(message) + for mask in self.mask_error_data: + error_message = error_message.replace(mask, '****') + return error_message + class Config: config_parser = None + tool: Tool imap_user = None imap_password = None @@ -126,12 +148,13 @@ class Config: tg_forward_attachment = True tg_forward_embedded_images = True - def __init__(self, cmd_args): + def __init__(self, tool, cmd_args): """ Parse config file to obtain login credentials, address of remote mail server, telegram config and configuration which controls behaviour of this script . """ try: + self.tool = tool self.config_parser = configparser.ConfigParser() files = self.config_parser.read(cmd_args.config) if len(files) == 0: @@ -140,6 +163,7 @@ def __init__(self, cmd_args): self.imap_user = self.get_config('Mail', 'user', self.imap_user) self.imap_password = self.get_config('Mail', 'password', self.imap_password) + tool.mask_error_data.append(self.imap_password) self.imap_server = self.get_config('Mail', 'server', self.imap_server) self.imap_port = self.get_config('Mail', 'port', self.imap_port, int) self.imap_timeout = self.get_config('Mail', 'timeout', self.imap_timeout, int) @@ -153,6 +177,7 @@ def __init__(self, cmd_args): self.imap_max_length = self.get_config('Mail', 'max_length', self.imap_max_length, int) self.tg_bot_token = self.get_config('Telegram', 'bot_token', self.tg_bot_token) + tool.mask_error_data.append(self.tg_bot_token) self.tg_forward_to_chat_id = self.get_config('Telegram', 'forward_to_chat_id', self.tg_forward_to_chat_id, int) self.tg_forward_mail_content = self.get_config('Telegram', 'forward_mail_content', @@ -167,13 +192,13 @@ def __init__(self, cmd_args): self.imap_read_old_mails = True except configparser.ParsingError as parse_error: - logging.critical(Tool.build_error_message( + logging.critical( "Error parsing config file: Impossible to parse file %s. Message: %s" - % (parse_error.source, parse_error.message)) + % (parse_error.source, parse_error.message) ) sys.exit(2) except configparser.Error as config_error: - logging.critical(Tool.build_error_message("Error parsing config file: %s." % config_error.message)) + logging.critical("Error parsing config file: %s." % config_error.message) sys.exit(2) def get_config(self, section, key, default=None, value_type=None): @@ -198,12 +223,12 @@ def get_config(self, section, key, default=None, value_type=None): raise configparser.NoSectionError(section) except configparser.Error as config_error: - logging.critical(Tool.build_error_message("Error parsing config file: %s." % config_error.message)) + logging.critical("Error parsing config file: %s." % config_error.message) raise config_error except Exception as get_val_error: - logging.critical(Tool.build_error_message( + logging.critical( "Get config value error for '%s'.'%s' (default: '%s'): %s." - % (section, key, default, get_val_error)) + % (section, key, default, get_val_error) ) raise get_val_error @@ -216,25 +241,26 @@ class MailAttachmentType(Enum): class MailAttachment: - idx = 0 - id = '' - name = '' - alt = '' - type = MailAttachmentType.BINARY - file = None - tg_id = None - - def __init__(self, attachment_type=MailAttachmentType.BINARY): + idx: int = 0 + id: str = '' + name: str = '' + alt: str = '' + type: MailAttachmentType = MailAttachmentType.BINARY + file: typing.Optional[str] = None + tg_id: typing.Optional[str] = None + + def __init__(self, attachment_type: MailAttachmentType = MailAttachmentType.BINARY): self.type = attachment_type - def set_name(self, file_name): - name = '' + def set_name(self, file_name: str): + name: str = '' for file_name_part in email.header.decode_header(file_name): part, encoding = file_name_part - name += Tool.binary_to_string(part, encoding=encoding) + tool: Tool = Tool() + name += tool.binary_to_string(part, encoding=encoding) self.name = name - def set_id(self, attachment_id): + def set_id(self, attachment_id: str): self.id = re.sub(r'[<>]', '', attachment_id) def get_title(self): @@ -247,36 +273,47 @@ def get_title(self): class MailBody: - text = '' - html = '' - images: dict[str, MailAttachment] = {} - attachments: [MailAttachment] = [] + text: str = '' + html: str = '' + images: typing.Optional[dict[str, MailAttachment]] = None + attachments: typing.Optional[list[MailAttachment]] = None -class MailData: +class MailDataType(Enum): TEXT = 1 HTML = 2 - uid = '' - raw = '' - type = TEXT - summary = '' - mail_from = '' - mail_subject = '' - mail_body = '' - mail_images: dict[str, MailAttachment] = {} - attachment_summary = '' - attachments = [MailAttachment] + +class MailImages(typing.TypedDict): + key: str + image: MailAttachment + + +class MailAttachments(typing.List): + attachment: MailAttachment + + +class MailData: + uid: str = '' + raw: str = '' + type: MailDataType = MailDataType.TEXT + summary: str = '' + mail_from: str = '' + mail_subject: str = '' + mail_body: str = '' + mail_images: typing.Optional[MailImages] = None + attachment_summary: str = '' + attachments: typing.Optional[MailAttachments] = None class TelegramBot: config: Config - def __init__(self, config): + def __init__(self, config: Config): self.config = config @staticmethod - def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str: + def cleanup_html(message: str, images: typing.Optional[dict[str, MailAttachment]] = None) -> str: """ Parse HTML message and remove HTML elements not supported by Telegram """ @@ -297,8 +334,8 @@ def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str: # tg_msg = re.sub(r'<\s*span\b', ']*>(?P.*).*$', '\g', tg_body, @@ -311,15 +348,18 @@ def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str: tg_body = re.sub(r'', '', tg_body, flags=(re.DOTALL | re.MULTILINE)) # handle inline images - image_seen = {} - for match in re.finditer(r'(?P<\s*img\s+[^>]*?\s*src\s*=\s*"(?P(?P(cid|https?):/*(?P[^"]*)))"[^>]*?/?\s*>)', - tg_body, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE)): - img = match.group('img') - proto = match.group('proto') + image_seen: dict[str] = {} + for match in re.finditer( + r'(?P<\s*img\s+[^>]*?\s*src\s*=\s*"' + r'(?P(?P(cid|https?):/*(?P[^"]*)))"[^>]*?/?\s*>)', + tg_body, + flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE)): + img: str = match.group('img') + proto: str = match.group('proto') # extract alt or title value - alt = re.sub(r'^.*?((title|alt)\s*=\s*"(?P[^"]+)")?.*$', '\g', - img, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE)) + alt: str = re.sub(r'^.*?((title|alt)\s*=\s*"(?P[^"]+)")?.*$', '\g', + img, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE)) if 'http' in proto: # web link @@ -372,7 +412,6 @@ def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str: r'<\s*(?!/?\s*(?Pbold|strong|i|em|u|ins|s|strike|del|b|a|code|pre)\b)[^>]*>', flags=(re.MULTILINE | re.IGNORECASE)) tg_msg = re.sub(regex_filter_elem, ' ', tg_msg) - #tg_msg = re.sub(r']*>', '', tg_msg, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE)) # remove empty links tg_msg = re.sub(r'<\s*a\s*>(?P[^<]*)', '\g ', tg_msg, @@ -392,10 +431,9 @@ def cleanup_html(message: str, images: dict[str, MailAttachment] = None) -> str: tg_msg = re.sub(r' ', ' ', tg_msg, flags=re.IGNORECASE) except Exception as ex: - logging.critical(Tool.build_error_message(ex)) + logging.critical(ex) - finally: - return tg_msg + return tg_msg def send_message(self, mails: [MailData]): """ @@ -418,7 +456,7 @@ def send_message(self, mails: [MailData]): parser = telegram.ParseMode.MARKDOWN_V2 else: parser = telegram.ParseMode.MARKDOWN - if mail.type == MailData.HTML: + if mail.type == MailDataType.HTML: parser = telegram.ParseMode.HTML if self.config.tg_forward_mail_content or not self.config.tg_forward_attachment: @@ -449,7 +487,7 @@ def send_message(self, mails: [MailData]): image_no += 1 # write image links - for img_link in re.finditer(r'(\${img-link:(?P[^\|]*)\|(?P[^}]*)})', message, + for img_link in re.finditer(r'(\${img-link:(?P[^|]*)\|(?P[^}]*)})', message, flags=(re.DOTALL | re.MULTILINE | re.IGNORECASE)): src = img_link.group('src') alt = img_link.group('alt') @@ -470,7 +508,7 @@ def send_message(self, mails: [MailData]): if self.config.tg_forward_attachment and len(mail.attachments) > 0: for attachment in mail.attachments: subject = mail.mail_subject - if mail.type == MailData.HTML: + if mail.type == MailDataType.HTML: file_name = attachment.name caption = '' + subject + ':\n' + file_name else: @@ -492,7 +530,7 @@ def send_message(self, mails: [MailData]): except telegram.TelegramError as tg_mail_error: msg = "❌ Failed to send Telegram message (UID: %s) to '%s': %s" \ % (mail.uid, str(self.config.tg_forward_to_chat_id), tg_mail_error.message) - logging.critical(Tool.build_error_message(msg)) + logging.critical(msg) try: # try to send error via telegram, and ignore further errors bot.send_message(chat_id=self.config.tg_forward_to_chat_id, @@ -503,10 +541,10 @@ def send_message(self, mails: [MailData]): pass except Exception as send_mail_error: - error_msgs = [Tool.binary_to_string(arg) for arg in send_mail_error.args] + error_msgs = [self.config.tool.binary_to_string(arg) for arg in send_mail_error.args] msg = "Failed to send Telegram message (UID: %s) to '%s': %s" \ % (mail.uid, str(self.config.tg_forward_to_chat_id), ', '.join(error_msgs)) - logging.critical(Tool.build_error_message(msg)) + logging.critical(msg) try: # try to send error via telegram, and ignore further errors bot.send_message(chat_id=self.config.tg_forward_to_chat_id, @@ -517,19 +555,19 @@ def send_message(self, mails: [MailData]): pass except telegram.TelegramError as tg_error: - logging.critical(Tool.build_error_message("Failed to send Telegram message: %s" % tg_error.message)) + logging.critical("Failed to send Telegram message: %s" % tg_error.message) return False except Exception as send_error: - error_msgs = [Tool.binary_to_string(arg) for arg in send_error.args] - logging.critical(Tool.build_error_message("Failed to send Telegram message: %s" % ', '.join(error_msgs))) + error_msgs = [self.config.tool.binary_to_string(arg) for arg in send_error.args] + logging.critical("Failed to send Telegram message: %s" % ', '.join(error_msgs)) return False return True class Mail: - mailbox: imaplib2.IMAP4_SSL + mailbox: typing.Optional[imaplib2.IMAP4_SSL] = None config: Config last_uid: str = '' @@ -562,7 +600,7 @@ def __init__(self, config): raise self.MailError(msg, gai_error) except imaplib2.IMAP4_SSL.error as imap_ssl_error: - error_msgs = [Tool.binary_to_string(arg) for arg in imap_ssl_error.args] + error_msgs = [self.config.tool.binary_to_string(arg) for arg in imap_ssl_error.args] msg = "Login to '%s:%i' failed: %s" % (config.imap_server, config.imap_port, ', '.join(error_msgs)) @@ -615,7 +653,7 @@ def disconnect(self): logging.debug("Cannot close mailbox: %s" % ', '.join(ex.args)) pass finally: - del self.mailbox + self.mailbox = None @staticmethod def decode_body(msg) -> MailBody: @@ -696,7 +734,7 @@ def get_last_uid(self): if rv != 'OK': logging.info("No messages found!") return '' - return Tool.binary_to_string(data[0]) + return self.config.tool.binary_to_string(data[0]) def parse_mail(self, uid, mail) -> (MailData, None): """ @@ -707,7 +745,7 @@ def parse_mail(self, uid, mail) -> (MailData, None): # decode body data (text, html, multipart/attachments) body = self.decode_body(msg) - message_type = MailData.TEXT + message_type = MailDataType.TEXT content = '' if self.config.tg_forward_mail_content: @@ -725,11 +763,12 @@ def parse_mail(self, uid, mail) -> (MailData, None): '${file:' + body.images[cid.string].tg_id + '}' ) + bot = TelegramBot(self.config) if self.config.tg_prefer_html: # Prefer HTML if body.html: - message_type = MailData.HTML - content = TelegramBot.cleanup_html(body.html, body.images) + message_type = MailDataType.HTML + content = bot.cleanup_html(body.html, body.images) elif body.text: content = telegram.utils.helpers.escape_markdown(text=content, @@ -741,14 +780,14 @@ def parse_mail(self, uid, mail) -> (MailData, None): version=self.config.tg_markdown_version) elif body.html: - message_type = MailData.HTML - content = TelegramBot.cleanup_html(body.html, body.images) + message_type = MailDataType.HTML + content = bot.cleanup_html(body.html, body.images) if content: # remove multiple line breaks (keeping up to 1 empty line) content = re.sub(r'(\s*\r?\n){2,}', "\n\n", content) - if message_type == MailData.HTML: + if message_type == MailDataType.HTML: # add space after links (provide space for touch on link lists) # '<' keep mail marker together (ex.: <t@ex.xom>) content = re.sub(r'(?P(\s*>)?)\s*', '\g\n\n', content, flags=re.MULTILINE) @@ -759,7 +798,7 @@ def parse_mail(self, uid, mail) -> (MailData, None): max_len = self.config.imap_max_length content_len = len(content) - if message_type == MailData.HTML and content_len > 0: + if message_type == MailDataType.HTML and content_len > 0: # get length from parsed HTML (all tags removed) content_plain = re.sub(r'<[^>]*>', '', content, flags=re.MULTILINE) # get new max length based on plain text factor @@ -767,7 +806,7 @@ def parse_mail(self, uid, mail) -> (MailData, None): max_len = int(max_len * plain_factor) if content_len > max_len: content = content[:max_len] - if message_type == MailData.HTML: + if message_type == MailDataType.HTML: # remove incomplete html tag content = re.sub(r'<(\s*\w*(\s*[^>]*?)?(]*)?)?$', '', content) else: @@ -778,14 +817,14 @@ def parse_mail(self, uid, mail) -> (MailData, None): # attachment summary attachments_summary = "" if body.attachments: - if message_type == MailData.HTML: + if message_type == MailDataType.HTML: attachments_summary = "\n\n" + chr(10133) + \ " " + str(len(body.attachments)) + " attachments:\n" else: attachments_summary = "\n\n" + chr(10133) + \ " **" + str(len(body.attachments)) + " attachments:**\n" for attachment in body.attachments: - if message_type == MailData.HTML: + if message_type == MailDataType.HTML: file_name = attachment.name else: file_name = telegram.utils.helpers.escape_markdown( @@ -793,19 +832,17 @@ def parse_mail(self, uid, mail) -> (MailData, None): attachments_summary += "\n " + str(attachment.idx) + ": " + file_name # subject - subject = '' - for subject_part in email.header.decode_header(msg['Subject']): - part, encoding = subject_part - subject += Tool.binary_to_string(part, encoding=encoding) + subject = self.config.tool.decode_mail_data(msg['Subject']) # build summary - mail_from = Tool.binary_to_string(msg['From']) + mail_from = self.config.tool.decode_mail_data(msg['From']) + if self.config.tg_forward_mail_content: summary_line = "\n=============================\n" else: summary_line = "\n" - if message_type == MailData.HTML: + if message_type == MailDataType.HTML: mail_from = html.escape(mail_from, quote=True) email_text = "From: " + mail_from + "\nSubject: " else: @@ -834,9 +871,9 @@ def parse_mail(self, uid, mail) -> (MailData, None): except Exception as parse_error: if len(parse_error.args) > 0: - logging.critical(Tool.build_error_message("Cannot process mail: %s" % parse_error.args[0])) + logging.critical("Cannot parse mail: %s" % parse_error.args[0]) else: - logging.critical(Tool.build_error_message("Cannot process mail: %s" % parse_error.__str__())) + logging.critical("Cannot parse mail: %s" % parse_error.__str__()) return None def search_mails(self) -> [MailData]: @@ -865,7 +902,7 @@ def search_mails(self) -> [MailData]: return except imaplib2.IMAP4_SSL.error as search_error: - error_msgs = [Tool.binary_to_string(arg) for arg in search_error.args] + error_msgs = [self.config.tool.binary_to_string(arg) for arg in search_error.args] msg = "Search with '%s' returned: %s" % (search_string, ', '.join(error_msgs)) if msg != self.previous_error: logging.error(msg) @@ -875,7 +912,7 @@ def search_mails(self) -> [MailData]: except Exception as search_ex: msg = ', '.join(map(str, search_ex.args)) - logging.critical(Tool.build_error_message("Cannot search mail: %s" % msg)) + logging.critical("Cannot search mail: %s" % msg) self.disconnect() return self.MailError(msg) @@ -893,7 +930,7 @@ def search_mails(self) -> [MailData]: logging.info('Reading mails having UID greater than %s...' % self.last_uid) for num in sorted(data[0].split()): - current_uid = int(Tool.binary_to_string(num)) + current_uid = int(self.config.tool.binary_to_string(num)) if current_uid > max_num: try: @@ -903,15 +940,14 @@ def search_mails(self) -> [MailData]: return msg_raw = data[0][1] - mail = self.parse_mail(Tool.binary_to_string(num), msg_raw) + mail = self.parse_mail(self.config.tool.binary_to_string(num), msg_raw) if mail is None: logging.error("Can't parse mail with UID: %s" % num) else: mails.append(mail) except Exception as mail_error: - logging.critical(Tool.build_error_message("Cannot process mail: %s" - % ', '.join(map(str, mail_error.args)))) + logging.critical("Cannot process mail: %s" % ', '.join(map(str, mail_error.args))) finally: # remember new UID for next loop @@ -935,6 +971,7 @@ class SystemdHandler(logging.Handler): logging.DEBUG: "<7> " + __appname__ + ": ", logging.NOTSET: "<7> " + __appname__ + ": ", } + tool: Tool def __init__(self, stream=sys.stdout): self.stream = stream @@ -942,6 +979,9 @@ def __init__(self, stream=sys.stdout): def emit(self, record): try: + if self.tool is not None: + # Normalize message and replace sensitive data + record.msg = self.tool.build_error_message(record.msg) msg = self.PREFIX[record.levelno] + self.format(record) + "\n" self.stream.write(msg) self.stream.flush() @@ -957,9 +997,10 @@ def main(): """ Run the main program """ + sys_handler = SystemdHandler() root_logger = logging.getLogger() root_logger.setLevel("INFO") - root_logger.addHandler(SystemdHandler()) + root_logger.addHandler(sys_handler) args_parser = argparse.ArgumentParser(description='Mail to Telegram Forwarder') args_parser.add_argument('-c', '--config', type=str, help='Path to config file', required=True) @@ -972,8 +1013,12 @@ def main(): sys.exit(2) mailbox = None + last_try = time.time() + tool = Tool() + sys_handler.tool = tool try: - config = Config(cmd_args) + config = Config(tool, cmd_args) + sys_handler.mask_error_data = tool.mask_error_data tg_bot = TelegramBot(config) mailbox = Mail(config) @@ -984,7 +1029,16 @@ def main(): mailbox = Mail(config) else: if not mailbox.is_connected(): # reconnect on error (broken connection) - mailbox = Mail(config) + if last_try + 60 < time.time(): + mailbox = Mail(config) + if not mailbox.is_connected(): + time.sleep(20) + continue + else: + last_try = time.time() # new timeout on success + else: + time.sleep(20) + continue mails = mailbox.search_mails() @@ -1003,10 +1057,9 @@ def main(): except Mail.MailError as mail_ex: if len(mail_ex.args) > 0: - logging.critical(Tool.build_error_message('Error occurred [mail]: %s' - % ', '.join(map(str, mail_ex.args)))) + logging.critical('Error occurred [mail]: %s' % ', '.join(map(str, mail_ex.args))) else: - logging.critical(Tool.build_error_message('Error occurred [mail]: %s' % mail_ex.__str__())) + logging.critical('Error occurred [mail]: %s' % mail_ex.__str__()) if mailbox is not None: mailbox.disconnect() @@ -1016,10 +1069,9 @@ def main(): except Exception as loop_error: if len(loop_error.args) > 0: - logging.critical(Tool.build_error_message('Error occurred [loop]: %s' - % ', '.join(map(str, loop_error.args)))) + logging.critical('Error occurred [loop]: %s' % ', '.join(map(str, loop_error.args))) else: - logging.critical(Tool.build_error_message('Error occurred [loop]: %s' % loop_error.__str__())) + logging.critical('Error occurred [loop]: %s' % loop_error.__str__()) if mailbox is not None: mailbox.disconnect() @@ -1032,10 +1084,9 @@ def main(): except Exception as main_error: if len(main_error.args) > 0: - logging.critical(Tool.build_error_message('Error occurred [main]: %s' - % ', '.join(map(str, main_error.args)))) + logging.critical('Error occurred [main]: %s' % ', '.join(map(str, main_error.args))) else: - logging.critical(Tool.build_error_message('Error occurred [main]: %s' % main_error.__str__())) + logging.critical('Error occurred [main]: %s' % main_error.__str__()) finally: if mailbox is not None: