| from alerts.zen_custom import class_logger, threaded
|
| from alerts.mail_alert import MailAlert, NiagaraAlert
|
|
|
|
|
| from imaplib import IMAP4_SSL
|
| from threading import Event
|
|
|
|
|
| @class_logger
|
| class MailClient:
|
| def __init__(self, username, password, server, customers, alert_queue, alert_event, mailbox='INBOX', *args, **kwargs):
|
| self.username = username
|
| self.password = password
|
| self.server = server
|
| self.customers = customers
|
| self.alert_queue = alert_queue
|
| self.alert_event = alert_event
|
| self.mailbox = mailbox
|
|
|
| self.shutdown = Event()
|
| self.initialized = Event()
|
|
|
| @threaded
|
| def init_connection(self):
|
| """
|
| Initializes the connection to the mail server
|
| """
|
| self.logger.info('Initializing connection to mail server')
|
| with IMAP4_SSL(self.server) as connection:
|
| try:
|
| connection.login(self.username, self.password)
|
| except Exception as e:
|
| self.logger.error(f'Failed to connect to mail server: {e}')
|
|
|
| self.connection = connection
|
|
|
| if connection.state == 'AUTH':
|
| self.logger.info('[%s] Connection authenticated' % self.username)
|
| else:
|
| self.logger.error('[%s] Failed to authenticate connection' % self.username)
|
| self.shutdown.set()
|
|
|
| connection.select(self.mailbox)
|
|
|
| if connection.state == 'SELECTED':
|
| self.logger.info('[%s] Mailbox selected: %s' % (self.username, self.mailbox))
|
| self.initialized.set()
|
| else:
|
| self.logger.error('[%s] Failed to select mailbox: %s' % (self.username, self.mailbox))
|
| self.shutdown.set()
|
|
|
| self.shutdown.wait()
|
| self.shutdown.clear()
|
| self.logger.info('[%s] Connection closed' % self.username)
|
| self.initialized.clear()
|
|
|
| def close_connection(self):
|
| """
|
| Closes the connection to the mail server
|
| """
|
| if hasattr(self, 'connection') and self.connection.state == 'LOGOUT':
|
| self.logger.info('[%s] Connection in LOGOUT state' % self.username)
|
| self.logger.debug('[%s] Connection state: %s' % (self.username, self.connection.state))
|
| self.logger.debug('[%s] Shutdown state: %s' % (self.username, self.shutdown.is_set()))
|
| return
|
|
|
| if not self.shutdown.isSet():
|
| self.logger.info("[%s] Sending shutdown signal to close the connection" % self.username)
|
| self.shutdown.set()
|
|
|
| def check_mail(self, single=False, checkonly=False):
|
| """
|
| Checks for new mail
|
| """
|
| close_after_done = False
|
| while not self.initialized.is_set():
|
| if single:
|
| self.logger.debug('[%s] Closing after single check' % self.username)
|
| close_after_done = True
|
| self.init_connection()
|
| self.initialized.wait()
|
|
|
| self.logger.debug('[%s] Checking for new mail' % self.username)
|
| try:
|
| status, data = self.connection.search(None, 'UNSEEN')
|
| if status != 'OK':
|
| raise Exception(f"Failed to check inbox, status: {status}")
|
| except Exception as e:
|
| self.logger.error('[%s] Failed to check for new mail, error: %s' % (self.username, e))
|
| self.close_connection()
|
| return
|
|
|
| unread_messages = None
|
| if not data[0]:
|
| self.logger.debug('No new mail')
|
| else:
|
| try:
|
| unread_messages = data[0].decode().split(' ')
|
| self.logger.debug('[%s] New mail found: %s' % (self.username, unread_messages))
|
| self.filter_mail(unread_messages)
|
| except Exception as e:
|
| self.logger.error('[%s] Failed to parse new mail, error: %s' % (self.username, e))
|
|
|
| if close_after_done:
|
| self.logger.info('[%s] Closing connection after single check' % self.username)
|
| self.close_connection()
|
|
|
| if checkonly:
|
| return unread_messages
|
|
|
| def filter_mail(self, message_numbers):
|
| """
|
| Filters unread messages based on the customers, and their specified alert types
|
| """
|
| from re import match
|
| # Capture the email address from the from field, between <>
|
| self.logger.debug('[%s] Filtering messages: %s' % (self.username, message_numbers))
|
| for message_number in message_numbers:
|
| status, from_header = self.connection.fetch(message_number, '(BODY[HEADER.FIELDS (FROM)])')
|
| if status != 'OK':
|
| self.logger.error('[%s] Failed to get header for message %s, error: %s' % (self.username, message_number, status))
|
| continue
|
|
|
| self.logger.debug("[%s] From header with status %s: %s" % (self.username, status, repr(from_header)))
|
| _, from_field = from_header[-2] # The from field should be the second to last, and should be a tuple
|
| from_str = from_field.decode()
|
| self.logger.debug('[%s] From string: %s' % (self.username, from_str))
|
|
|
| from_regex = r'^From: "?(?:.*?)"? <(\S+@\S+)>\s*|^From: (\S+@\S+)\s*'
|
| try:
|
| re_match = match(from_regex, from_str).groups()
|
| sender = re_match[0] if re_match[0] else re_match[1] # Get the first non-empty match
|
| self.logger.debug('[%s] Sender for message %s: %s' % (self.username, message_number, sender))
|
| except AttributeError:
|
| self.logger.warning('[%s] Failed to parse sender for message %s:\n%s-----' % (self.username, message_number, from_str))
|
| continue
|
|
|
| customer_data = None
|
| for customer, customer_config in self.customers.items():
|
| if sender in customer_config['addresses']:
|
| self.logger.info('[%s] Mail detected from %s (%s), customer type: %s' % (self.username, sender, customer, customer_config['type']))
|
| customer_data = (customer, customer_config)
|
|
|
| if customer_data:
|
| body = self.get_message_body(message_number)
|
| customer_name, customer_config = customer_data
|
| self.alert_queue.put(getattr(self, 'parse_%s' % customer_config['type'])(body, customer_data))
|
| self.alert_event.set()
|
| else:
|
| self.logger.warning('[%s] New mail from unknown sender: %s' % (self.username, sender))
|
|
|
| def get_message_body(self, message_number):
|
| """
|
| Gets the actual message content from the message number
|
| """
|
| from quopri import decodestring
|
|
|
| self.logger.debug('[%s] Attempting to get message body for message %s' % (self.username, message_number))
|
| status, data = self.connection.fetch(message_number, '(BODY[TEXT])')
|
|
|
| if status != 'OK':
|
| self.logger.error('[%s] Failed to get message body for message %s, error: %s' % (self.username, message_number, status))
|
| return
|
|
|
| self.logger.debug('[%s] Message body with status %s: %s' % (self.username, status, data))
|
| body = decodestring(data[0][1]).decode(errors='ignore')
|
| self.logger.debug('[%s] Message body for message %s: %s' % (self.username, message_number, body))
|
| return body
|
|
|
| def parse_raw(self, message, customer_data):
|
| alert = MailAlert(message, customer_data, logger=self.logger)
|
| self.logger.debug("[%s] Raw alert: %s" % (self.username, alert))
|
| return alert
|
|
|
| def parse_Niagara(self, message, customer_data):
|
| """
|
| Parses a Niagara message
|
| """
|
| alert = NiagaraAlert(message, customer_data, logger=self.logger)
|
| self.logger.debug('[%s] Niagara alert: %s' % (self.username, alert))
|
| return alert
|