Recently at work, I’ve had to implement a fast way to process an entire index’s worth of items from Elasticsearch. Because processing each item takes one or multiple network calls (i.e. reindexing, hydration), blocking to make those network requests while processing each and every item becomes very inefficient. Rather, we want to let the process perform other work while the network request is ongoing. Green (userspace) threads are a good fit for this kind of workload, which let us quickly overlap processing of items over blocking I/O bounds.
I was wary of system threads for this kind of work because the OS scheduler might need to schedule between tens of thousands of concurrent item processing all of which will yield to each other pretty quickly, and the context switch overhead of kernel threads will destroy performance.
Because most of our processing scripts are written in Python3, asyncio is perfect for this use case. Python3’s asyncio (or in common parlance, async/await) lets us schedule arbitrary concurrent workloads that may perform I/O without incurring much scheduling and blocking overhead by making use of an event loop. (If you are interested in event loops, you have to see the original Node.js talk by Ryan Dahl JSConf 2009)
The first step is a simple asynchronous generator over a Search / Scroll API, fetching say 10000 items at a time (as you’ll see later, this is a bad idea due to JSON). While this sounds like it doesn’t accomplish much, this still has substantial benefits if downstream processing incurs an I/O in any way. For example, if we need to query an API for each item, we can make 10000 requests concurrently just by the fact that we’re running under async/await. Even better, this means that downstream code can control the size of batch API requests (i.e. batch 100 items for 100 concurrent external requests.)
This obviously has limitations, namely that Elasticsearch is not being queried while processing is being undertaken. Ideally, we would like to use multiple Elasticsearch Slice API searches to maximise bandwidth utilisation by overlapping search latency. (For reference, a 10000 search size query on our infrastructure took several seconds.)
Initially, I designed the entire system to use asynchronous generators over Elasticsearch slices. In essence, I partitioned the search into independent search requests, and created an asynchronous generator for each of them. A grand async generator will “await” for any one of those generators to yield, similar to how “asyncio.as_completed()” yields items gradually.
However, because Python 3.7 has limited infrastructure for combining asynchronous generators in such a way, a more sensible approach is a producer-consumer queue. The best I could do with asynchronous generators was using “asyncio.wait()” over each “agen.__anext__()” tasks, but a problem exists where you cannot distinguish “done, pending” pairs back into which asynchronous generator should be queued next (well, a hash map works, but this is where you realise not to go too far)
I used a queue sentinel to indicate end-of-task, and a watchdog timer to ensure that tasks that have excepted do not hang the program.
This is on a test workload to dump items to disk, which doesn’t benefit much as no downstream async calls are made, representative of a no-op workload. As you can see, most of the time is taken up by parsing JSON which means much of the event loop overhead is minimised. I will talk about the tricks I used to improve performance at a later time!