Source code for pisak.email.parsers

"""
Email parsers.
"""
import email
import time
from datetime import datetime

from pisak import logger


_LOG = logger.get_logger(__name__)


DEFAULT_CHARSET = "utf-8"


def _decode_message(message):
    """
    Decode content of the message.

    :param message: non-multipart message object.

    :return: decoded content of the message
    """
    content = message.get_payload(decode=True)
    charset = message.get_content_charset(DEFAULT_CHARSET)
    return content.decode(charset, "replace")


def _get_addresses(value, header=None):
    """
    Extract all addresses from a header with the given name.

    :params value: single string with raw header value or
    `email.message.Message` instance.
    :param header: header name, obligatory when 'value' is an
    `email.message.Message` instance.

    :return: in case when 'value' param is an `email.message.Message`instance
    then returns list of tuples containing name and address for each record,
    if 'value' is a string then returns a single tuple of this kind.
    """
    if isinstance(value, str):
        return email.utils.parseaddr(value)
    elif isinstance(value, email.message.Message):
        return email.utils.getaddresses(value.get_all(header, []))
    else:
        _LOG.error("Invalid argument 'value'. Only string or "
                   "'email.message.Message' instance are accepted.")


def _parse_date(raw_date):
    """
    Parse date of the message.

    :param raw_date: raw date string.

    :return: datetime object or None in case of parsing failure
    """
    date_tuple = email.utils.parsedate(raw_date)
    return datetime.fromtimestamp(time.mktime(date_tuple)) \
            if date_tuple else None


def _decode_header(header):
    """
    Decode the given header with charsets supplied within or
    with the default charset.

    :param header: raw header.

    :return: single string with a decoded header.
    """
    try:
        headers = email.header.decode_header(header)
    except email.errors.HeaderParseError:
        return header.encode(DEFAULT_CHARSET, "replace").decode(
            DEFAULT_CHARSET, "replace")
    else:
        for idx, (value, charset) in enumerate(headers):
            if isinstance(value, bytes):
                headers[idx] = value.decode(
                    charset or DEFAULT_CHARSET, "replace")
            elif isinstance(value, str):
                headers[idx] = value.encode(
                    charset or DEFAULT_CHARSET, "replace").decode(
                    charset or DEFAULT_CHARSET, "replace")
        return "".join(headers)


[docs]def parse_message(raw_message): """ Parse the given raw message. :param raw_message: single string with the whole raw message. :return: dictionary containing all fields of parsed message. """ parsed_msg = {} msg = email.message_from_string(raw_message) content_type = msg.get_content_maintype() body = [] # look for plain text message body if content_type == "multipart": for part in msg.walk(): is_inline = part.get("Content-Disposition") in (None, "inline") if part.get_content_type() == "text/plain" and is_inline: body.append(_decode_message(part)) elif content_type in ("text/plain", "text"): body.append(_decode_message(msg)) parsed_msg["Body"] = "\n".join(body) # put all the message fields into a dictionary parsed_msg.update(msg.items()) # look for all the addresses parsed_msg["To"] = _get_addresses(msg, "To") parsed_msg["From"] = _get_addresses(msg, "From") # decode some headers that may need that for header in ["Subject", "Date", "Message-ID"]: if header in parsed_msg: parsed_msg[header] = _decode_header(parsed_msg.get(header)) # convert date into datetime object if possible date = _parse_date(parsed_msg.get("Date")) if date: parsed_msg["Date"] = date return parsed_msg
[docs]def parse_mailbox_list(ids, msg_data, headers): """ Parse list of message previews. :param ids: list of the given messages ids. :param msg_data: raw messages data. :param headers: list of headers to be parsed. :return: list of dictionaries containing parsed message previews. """ mailbox_list = [] for idx, (_spec, msg) in enumerate(reversed(msg_data[::2])): parsed_msg = {"UID": ids[idx]} str_msg = msg.decode(DEFAULT_CHARSET, "replace") for header_name in headers: parsed_header = _decode_header(str_msg[ str_msg.find(header_name) + len(header_name)+1: ].split("\r\n")[0]) if header_name == "Date": parsed_msg[header_name] = _parse_date(parsed_header) or \ parsed_header elif header_name in ("From", "To"): parsed_msg[header_name] = _get_addresses(parsed_header, header_name) else: parsed_msg[header_name] = parsed_header mailbox_list.append(parsed_msg) return mailbox_list