This paste expires on 2023-08-14 15:06:00.842542. Repaste, or download this paste. . Pasted through v1-api.

from alerts.zen_custom import class_logger, threaded
from alerts.mail_alert import MailAlert, NiagaraAlert
from imaplib import IMAP4_SSL
from threading import Event
@class_logger
class MailClient:
    def __init__(self, username, password, server, customers, alert_queue, alert_event, mailbox='INBOX', *args, **kwargs):
        self.username = username
        self.password = password
        self.server = server
        self.customers = customers
        self.alert_queue = alert_queue
        self.alert_event = alert_event
        self.mailbox = mailbox
        self.shutdown = Event()
        self.initialized = Event()
    @threaded
    def init_connection(self):
        """
        Initializes the connection to the mail server
        """
        self.logger.info('Initializing connection to mail server')
        with IMAP4_SSL(self.server) as connection:
            try:
                connection.login(self.username, self.password)
            except Exception as e:
                self.logger.error(f'Failed to connect to mail server: {e}')
            self.connection = connection
            if connection.state == 'AUTH':
                self.logger.info('[%s] Connection authenticated' % self.username)
            else:
                self.logger.error('[%s] Failed to authenticate connection' % self.username)
                self.shutdown.set()
            connection.select(self.mailbox)
            if connection.state == 'SELECTED':
                self.logger.info('[%s] Mailbox selected: %s' % (self.username, self.mailbox))
                self.initialized.set()
            else:
                self.logger.error('[%s] Failed to select mailbox: %s' % (self.username, self.mailbox))
                self.shutdown.set()
            self.shutdown.wait()
            self.shutdown.clear()
        self.logger.info('[%s] Connection closed' % self.username)
        self.initialized.clear()
    def close_connection(self):
        """
        Closes the connection to the mail server
        """
        if hasattr(self, 'connection') and self.connection.state == 'LOGOUT':
            self.logger.info('[%s] Connection in LOGOUT state' % self.username)
            self.logger.debug('[%s] Connection state: %s' % (self.username, self.connection.state))
            self.logger.debug('[%s] Shutdown state: %s' % (self.username, self.shutdown.is_set()))
            return
        if not self.shutdown.isSet():
            self.logger.info("[%s] Sending shutdown signal to close the connection" % self.username)
            self.shutdown.set()
    def check_mail(self, single=False, checkonly=False):
        """
        Checks for new mail
        """
        close_after_done = False
        while not self.initialized.is_set():
            if single:
                self.logger.debug('[%s] Closing after single check' % self.username)
                close_after_done = True
            self.init_connection()
            self.initialized.wait()
        self.logger.debug('[%s] Checking for new mail' % self.username)
        try:
            status, data = self.connection.search(None, 'UNSEEN')
            if status != 'OK':
                raise Exception(f"Failed to check inbox, status: {status}")
        except Exception as e:
            self.logger.error('[%s] Failed to check for new mail, error: %s' % (self.username, e))
            self.close_connection()
            return
        unread_messages = None
        if not data[0]:
            self.logger.debug('No new mail')
        else:
            try:
                unread_messages = data[0].decode().split(' ')
                self.logger.debug('[%s] New mail found: %s' % (self.username, unread_messages))
                self.filter_mail(unread_messages)
            except Exception as e:
                self.logger.error('[%s] Failed to parse new mail, error: %s' % (self.username, e))
        if close_after_done:
            self.logger.info('[%s] Closing connection after single check' % self.username)
            self.close_connection()
        if checkonly:
            return unread_messages
    def filter_mail(self, message_numbers):
        """
        Filters unread messages based on the customers, and their specified alert types
        """
        from re import match
        # Capture the email address from the from field, between <>
        self.logger.debug('[%s] Filtering messages: %s' % (self.username, message_numbers))
        for message_number in message_numbers:
            status, from_header = self.connection.fetch(message_number, '(BODY[HEADER.FIELDS (FROM)])')
            if status != 'OK':
                self.logger.error('[%s] Failed to get header for message %s, error: %s' % (self.username, message_number, status))
                continue
            self.logger.debug("[%s] From header with status %s: %s" % (self.username, status, repr(from_header)))
            _, from_field = from_header[-2]  # The from field should be the second to last, and should be a tuple
            from_str = from_field.decode()
            self.logger.debug('[%s] From string: %s' % (self.username, from_str))
            from_regex = r'^From: "?(?:.*?)"? <(\S+@\S+)>\s*|^From: (\S+@\S+)\s*'
            try:
                re_match = match(from_regex, from_str).groups()
                sender = re_match[0] if re_match[0] else re_match[1]  # Get the first non-empty match
                self.logger.debug('[%s] Sender for message %s: %s' % (self.username, message_number, sender))
            except AttributeError:
                self.logger.warning('[%s] Failed to parse sender for message %s:\n%s-----' % (self.username, message_number, from_str))
                continue
            customer_data = None
            for customer, customer_config in self.customers.items():
                if sender in customer_config['addresses']:
                    self.logger.info('[%s] Mail detected from %s (%s), customer type: %s' % (self.username, sender, customer, customer_config['type']))
                    customer_data = (customer, customer_config)
            if customer_data:
                body = self.get_message_body(message_number)
                customer_name, customer_config = customer_data
                self.alert_queue.put(getattr(self, 'parse_%s' % customer_config['type'])(body, customer_data))
                self.alert_event.set()
            else:
                self.logger.warning('[%s] New mail from unknown sender: %s' % (self.username, sender))
    def get_message_body(self, message_number):
        """
        Gets the actual message content from the message number
        """
        from quopri import decodestring
        self.logger.debug('[%s] Attempting to get message body for message %s' % (self.username, message_number))
        status, data = self.connection.fetch(message_number, '(BODY[TEXT])')
        if status != 'OK':
            self.logger.error('[%s] Failed to get message body for message %s, error: %s' % (self.username, message_number, status))
            return
        self.logger.debug('[%s] Message body with status %s: %s' % (self.username, status, data))
        body = decodestring(data[0][1]).decode(errors='ignore')
        self.logger.debug('[%s] Message body for message %s: %s' % (self.username, message_number, body))
        return body
    def parse_raw(self, message, customer_data):
        alert = MailAlert(message, customer_data, logger=self.logger)
        self.logger.debug("[%s] Raw alert: %s" % (self.username, alert))
        return alert
    def parse_Niagara(self, message, customer_data):
        """
        Parses a Niagara message
        """
        alert = NiagaraAlert(message, customer_data, logger=self.logger)
        self.logger.debug('[%s] Niagara alert: %s' % (self.username, alert))
        return alert
Filename: stdin. Size: 8kb. View raw, , hex, or download this file.