Today we’re going to explore Python’s asyncio library. This library provides an interface to write concurrent code.

Being in Python, asyncio makes it easier to prototype small code blocks to learn these concepts.


Table of Contents

There’s a lot of concepts that you need to wrap your head around to be truly proficient with async programming.

For example, you should be able to explain the difference between threads and processes and what type of tasks each is best suited for.

Jargon such as coroutines, async/await, generators, event loops are also helpful concepts to understand.

Hence I think we should work backwards for this topic.

Code-complete Example

Let’s start with an example and work through the async components.

You should be able to copy-paste the code and run it as is.

import asyncio
import concurrent.futures
import functools
import requests
import json
import pandas as pd
import time

from pathlib import Path
from typing import Any, Callable, Dict, Generator, List, Tuple

##
# Helper functions
#

def async_timed():  # [8]
    def wrapper(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapped(*args, **kwargs) -> Any:
            start = time.time()
            try:
                return await func(*args, **kwargs)  # [8]
            finally:
                end = time.time()
                total = end - start
            
                print(f"Finished {func} in {total:.4f} second(s).")
        return wrapped
    return wrapper

def partition(
        data: List,
        chunk_size: int
    ) -> Generator[List, None, None]:
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

##
# Asynchrous Execution
#

@async_timed()
async def execute_tasks( # [1]
    task_items: List[int],
    chunk_size: int,
) -> List[List[int]]:
    loop = asyncio.get_event_loop() # [2]
    tasks = []

    with concurrent.futures.ProcessPoolExecutor() as pool: # [3]
        for i, task_partition in enumerate(partition(task_items, chunk_size=chunk_size)):
            tasks.append(loop.run_in_executor(  # [4]
                pool, 
                functools.partial( # [5]
                    task,
                    task_item=task_partition,
                    partition=i,
                )
            ))
        completed_tasks: List[List[int]] = await asyncio.gather(*tasks)  # [6]
        return [t for task_arr in completed_tasks for t in task_arr]

def task(
    task_item: List[int],
    partition: int,
):
    # Simulate the task
    time.sleep(1)
    print(f"i: {partition} | {time.time()}")
    return [x * partition for x in task_item]

if __name__ == "__main__":
    task_items = [i for i in range(100)]
    chunk_size = 10
    
    async_results = asyncio.run(execute_tasks(  # [7]
        task_items=task_items,
        chunk_size=chunk_size,
    ))
    
    # Comparison with synchronous execution.
    start = time.time()
    results = []
    for i, task_partition in enumerate(partition(task_items, chunk_size=chunk_size)):
        results += task(
            task_item=task_partition,
            partition=i,
        )
    end = time.time()
    print(f"Finished synchronous task in {(end - start):4f} second(s).")

This snippet of code splits an array of integers into 10 parts and processes them in parallel, based on how many processes are created in the ProcessPoolExecutor.

If the pool has 2 processes, then we execute 2 parts at once, finishing in ~5 seconds instead of 10 seconds if we were to run the processes synchronously.

Let’s discuss the concepts that I’ve numbered.

[1] Adding async infront of def

Prepending async to a function def states that this function is intended to be executed asynchronously. More precisely, we declare the function as a coroutine.

What is coroutine? Hmmm…

Normally if a function calls another function, i.e.

def f():
	print("f() is executing.")
	print("g() will re-gain control once f() is finished.")

def g():
	f()
	print("Control is passed back to g()")

the control flow will remain with f() until it finishes execution, upon which control is returned to g() and it continues.

With coroutines, control is passed back and forth (not necessarily explicitly written in code) between the two functions (hence async as we let f() run in the background and allow g() to resume control without waiting for f() to finish.

[2] event loops

The event loop manages the execution of the asynchronous tasks, and interleaves the tasks as they are stuck in limbo awaiting for the Future to complete.

Here is where the async/await concept comes in. When we execute an asynchronous task, we don’t sit around and wait for the result. We go do something else and come back later. The result of an asynchronous task is called the Future.

The await function essentially says stop here until I finish.

Let’s say you want to execute tasks in parallel, you’ll need to explicitly create the tasks and then await on them.

import asyncio
import time

async def delayed_task(delay_secs, text):
    await asyncio.sleep(delay_secs)
    print(text)

async def main():
    task1 = asyncio.create_task(
        delayed_task(1, 'penguin'))

    task2 = asyncio.create_task(
        delayed_task(2, 'engineering'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

If you simply executed await on the delayed_task without wrapping it as an asyncio task, it would have ran synchronously leading to no time savings.

Most of the time you won’t need to manipulate the event loop. Instead, you should be able to call asyncio.run() to execute your tasks.

However, as you see in [6] we needed the loop.

[3] concurrent.futures.ProcessPoolExecutor()

concurrent.futures is the python module that allows programmers to easily set up asynchronous function calls.

Here we use the ProcessPoolExecutor because we have CPU intensive tasks whereas its cousin ThreadPoolExecutor is suited for I/O tasks. Creating a pool offloads the management of parallel processes because we delegate the assignment of tasks to the Executor.

At [3] we provide the pool in a context using with concurrent.futures.ProcessPoolExecutor() as pool: and execute the tasks via run_in_executor.

[4] loop.run_in_executor()

We specify the Callable here to be run in the pool, specifically the parent Executor class. To simplify things, you can think of Callables as functions.

[5] functools.partial()

Partial functions are a useful concept - specify some of the parameters for a function in advance and leave a few for the programmer to define when needed.

import functools

def general(a, b, c):
    return a + b + c

specific = functools.partial(general, a=1, b=2)
print(specific(c=5))
# 1 + 2 + 5 = 8 

Here, we pass the Callable as a partial function as required by run_in_executor and intuitively it makes sense as we’re not executing it right now, but later inside the one of the processes and we need a way to hold the arguments.

[6] await asyncio.gather(*tasks)

We hold the results of each function in an array. We call await + .gather() to ensure everything is finished before continuing.

[7] asyncio.run(execute_tasks(…))

Finally, we let it loose with asyncio.run(). Async functions don’t just run by itself when the interpreter passes it. We need to be explicit.

[8] def async_timed()

To test my understanding and validate that my asynchronous code indeed speeds things up, I wrote a decorator timer that supports async functions to use. Note the use of async and await.

Conclusion

That’s it!

This was one of the first async pieces of code I wrote and it helped me understand the basic concepts. As an added bonus, it also speed my data processing step by 5x - worth the adventure!