Asyncio imap fetch mails python3 Asyncio imap fetch mails python3 python-3.x python-3.x

Asyncio imap fetch mails python3


If you don't have an asynchronous I/O-based imap library, you can just use a concurrent.futures.ThreadPoolExecutor to do the I/O in threads. Python will release the GIL during the I/O, so you'll get true concurrency:

def init_connection(d):        username = d['usern']    password = d['passw']    connection = imaplib.IMAP4_SSL('imap.bar.de')    connection.login(username, password)    connection.select()    return connectionlocal = threading.local() # We use this to get a different connection per threaddef do_fetch(num, d, rfc):    try:        connection = local.connection    except AttributeError:        connnection = local.connection = init_connection(d)    return connnection.fetch(num, rfc)@asyncio.coroutinedef get_attachment(d, pool):    connection = init_connection(d)        # list all available mails    typ, data = connection.search(None, 'ALL')    # Kick off asynchronous tasks for all the fetches    loop = asyncio.get_event_loop()    futs = [asyncio.create_task(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)'))                for num in data[0].split()]    # Process each fetch as it completes    for fut in asyncio.as_completed(futs):        typ, data = yield from fut        raw_string = data[0][1].decode('utf-8')        msg = email.message_from_string(raw_string)        for part in msg.walk():            if part.get_content_maintype() == 'multipart':                continue            if part.get('Content-Disposition') is None:                continue            if part.get_filename():                body = part.get_payload(decode=True)                # do something with the body, async?    connection.close()    connection.logout()    loop = asyncio.get_event_loop()pool = ThreadPoolExecutor(max_workers=5)  # You can probably increase max_workers, because the threads are almost exclusively doing I/O.loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data]))loop.close()

This isn't quite as nice as a truly asynchronous I/O-based solution, because you've still got the overhead of creating the threads, which limits scalability and adds extra memory overhead. You also do get some GIL slowdown because of all the code wrapping the actual I/O calls. Still, if you're dealing with less than thousands of mails, it should still perform ok.

We use run_in_executor to use the ThreadPoolExecutor as part of the asyncio event loop, asyncio.async to wrap the coroutine object returned in a asyncio.Future, and as_completed to iterate through the futures in the order they complete.

Edit:

It seems imaplib is not thread-safe. I've edited my answer to use thread-local storage via threading.local, which allows us to create one connection object per-thread, which can be re-used for the entire life of the thread (meaning you create num_workers connection objects only, rather than a new connection for every fetch).


I had the same needs : fetching emails with python 3 fully async. If others here are interested I pushed an asyncio IMAP lib here : https://github.com/bamthomas/aioimaplib

You can use it like this :

import asynciofrom aioimaplib import aioimaplib@asyncio.coroutinedef wait_for_new_message(host, user, password):    imap_client = aioimaplib.IMAP4(host=host)    yield from imap_client.wait_hello_from_server()    yield from imap_client.login(user, password)    yield from imap_client.select()    asyncio.async(imap_client.idle())    id = 0    while True:        msg = yield from imap_client.wait_server_push()        print('--> received from server: %s' % msg)        if 'EXISTS' in msg:            id = msg.split()[0]            imap_client.idle_done()            break    result, data = yield from imap_client.fetch(id, '(RFC822)')    email_message = email.message_from_bytes(data[0])    attachments = []    body = ''    for part in email_message.walk():        if part.get_content_maintype() == 'multipart':            continue        if part.get_content_maintype() == 'text' and 'attachment' not in part.get('Content-Disposition', ''):            body = part.get_payload(decode=True).decode(part.get_param('charset', 'ascii')).strip()        else:            attachments.append(                {'type': part.get_content_type(), 'filename': part.get_filename(), 'size': len(part.as_bytes())})    print('attachments : %s' % attachments)    print('body : %s' % body)    yield from imap_client.logout()if __name__ == '__main__':    loop = asyncio.get_event_loop()    loop.run_until_complete(wait_for_new_message('my.imap.server', 'user', 'pass'))

Large emails with attachments are also downloaded with asyncio.