Async programming in Python

https://compro-prasad.gitlab.io

What causes delays when executing a program?

Waiting for program compilation

Waiting for keyboard input

Waiting to copy files to a storage device

Waiting to upload / download data over network

Waiting for scheduled tasks

Thinking parallel execution

  1. Threads
  2. Processes
  3. Coroutines

Prerequisites

Prerequisite - time command

time sleep 2
real	0m2.002s
user	0m0.000s
sys	0m0.001s

We will only be using the real time

Prerequisite - Functions as objects

def identity(x):
    return x
print(sorted([2, 1, 3], key=identity))
[1, 2, 3]

Threads

  1. OS threads
  2. Shared globals
  3. Can use multiple cores
    1. but is blocked by GIL
    2. experimental free threading support

Threads - Simple code example

from threading import Thread

def add(a, b):
    print(a + b)

t = Thread(target=add, args=(1, 2))
t.start()

Threads - ThreadPoolExecutor small tasks

from concurrent.futures import ThreadPoolExecutor
from utils import is_prime
with ThreadPoolExecutor() as exe:
    exe.map(is_prime, range(100000))

Threads - ThreadPoolExecutor large tasks

from concurrent.futures import ThreadPoolExecutor
from utils import cpu_task
with ThreadPoolExecutor() as exe:
    exe.map(cpu_task, [7, 7, 7])

Processes

  1. OS processes
  2. Can use multiple cores
  3. Isolated memory
    1. Globals are of no use
    2. Uses more memory
    3. Communication (data transfer) is slower

Processes - Simple code example

from multiprocessing import Process

def add(a, b):
    print(a + b)

t = Process(target=add, args=(1, 2))
t.start()

Processes - ProcessPoolExecutor small tasks

from concurrent.futures import ProcessPoolExecutor
from utils import is_prime
with ProcessPoolExecutor() as exe:
    exe.map(is_prime, range(100000))

Processes - ProcessPoolExecutor large tasks

from concurrent.futures import ProcessPoolExecutor
from utils import cpu_task
with ProcessPoolExecutor() as exe:
    exe.map(cpu_task, [7, 7, 7])

Coroutines

  1. async functions are considered coroutines.
  2. The async await syntax enables definition
  3. but cannot be executed without a runner / runtime
  4. Good for IO bound tasks
  5. Cannot make use of multiple CPU cores at the same time

Coroutines - Code example

import asyncio

async def add(a, b):
    print(a + b)

asyncio.run(add(1, 2))

Coroutines - Runners

  1. asyncio - Default
  2. uvloop - Powererd by libuv
  3. trio, AnyIO, etc.

Triggering coroutines

  1. To start a function use asyncio.run(func(arg1, arg2))
  2. To run next use await func(arg1, arg2)
  3. To schedule in background use asyncio.create_task(func(arg1, arg2))

Examples of IO bound tasks

  • Keyboard / mouse inputs
  • Reading / writing files
  • Requesting data over network

Why IO bound tasks are good for async programming?

While waiting for external events like keyboard events or network calls to succeed, you can make use of the CPU time to run other tasks.

Coroutine implementations

  1. epoll on Linux
  2. kqueue on macOS and BSD
  3. IOCP on Windows

Fundamentally they are internally implemented using OS and hardware level interrupts

Kinds of execution

  1. Sequential execution
  2. Parallel execution
  3. Interleaved execution

Sequential

Most programs are sequential by default and run on a single CPU unless threads, process or coroutines are used

Parallel

  • Different processes are run parallely on different CPU cores
  • Threads can run parallely on multiple cores but are restricted by the GIL

Interleaved

  • Coroutines run on a single CPU but wait for IO bound tasks to complete
  • Threads also follow a similar approach because of the GIL

Async generators

async def aenumerate(it):
    for i, x in enumerate(it):
	  yield i, x
async for i, x in aenumerate(it):
    print(i, x)

Async pitfalls

Not using asyncio.run

await func(arg1, arg2)        # incorrect

asyncio.run(func(arg1, arg2)) # correct

Always run entrypoint function using asyncio.run

await outside a function

await func(arg1, arg2)

Use await inside an async function.

async def f1(arg1, arg2):
    await func(arg1, arg2)

Not awaiting

# incorrect
user = User.objects.aget(pk=1)

User.objects.aget returns a coroutine which needs to be awaited

# correct
user = await User.objects.aget(pk=1)

Chaining - incorrect

User.objects.aget(pk=1).name

User.objects.aget returns a coroutine which needs to be awaited first

(await User.objects.aget(pk=1)).name

Django async pitfalls

Django async pitfalls - 1

async def view1(request):
    # incorrect
    User.objects.get(pk=1)
async def view1(request):
    # correct
    await User.objects.aget(pk=1)

Django async pitfalls - 2

def view2(request):
    # incorrect
    await User.objects.aget(pk=1)
def view2(request):
    # correct
    User.objects.get(pk=1)

Django async pitfalls - 3

async def view3(request):
    users = User.objects.filter(is_superuser=False)
    # incorrect
    for user in users:
	   print(user)
async def view3(request):
    users = User.objects.filter(is_superuser=False)
    # correct
    async for user in users:
	   print(user)

Django async pitfalls - 4

async def view4(request):
    # incorrect
    await sync_orm_func(arg1, arg2)
from asgiref import sync_to_async
async def view4(request):
    # correct
    func = sync_to_async(sync_orm_func)
    await func(arg1, arg2)

Django async pitfalls - 5

def view5(request):
    # incorrect
    async_orm_func(arg1, arg2)
from asgiref import async_to_sync
def view5(request):
    # correct
    func = async_to_sync(async_orm_func)
    func(arg1, arg2)

Async ORMs

  1. Django ORM - partial support
  2. Piccolo ORM - Django like
  3. SQLAlchemy
  4. asyncpg - Not an ORM

Async API frameworks

  1. FastAPI
  2. Litestar
  3. Starlette - Base used by FastAPI

aiolimiter