Using Rustlang's Async Tokio Runtime for CPU-Bound Tasks

Navigate to:

This article was originally published in The New Stack on January 14, 2022 and is being republished here with permission. 

Despite the term async and its association with asynchronous network I/O, this blog post argues that the Tokio runtime at the heart of the Rust async ecosystem is also a good choice for CPU-heavy jobs such as those found in analytics engines.

What is Tokio?

Rust has built-in support for an asynchronous (async) programming model, similar to languages such as JavaScript.

To take full advantage of multiple cores and async I/O, a runtime must be used, and while several alternatives are available to the Rust community, Tokio is the de facto standard. Tokio.rs describes it as: “an asynchronous runtime for the Rust programming language. It provides the building blocks needed for writing network applications.”

While this description emphasizes Tokio’s use for network communications, the runtime can be used for other purposes, as we will explore below.

Why use Tokio for CPU tasks?

It turns out that modern analytics engines invariably need to handle client requests from a network, as well as use a network to communicate with object storage systems such as AWS S3, GCP Cloud Storage and Azure Blob Storage.

Thus, any such system implemented in Rust will end up using Tokio for its network and at least part of its storage I/O (yes, I know, initially Tokio async file IO is not really async, but it is coming soon).

Analytic systems also do CPU-heavy computations, which I define as processing data in a way that consumes large amounts of CPU for storage reorganization, precomputing various indices or directly answering client queries. These computations are typically broken down into many independent chunks, which I will call “tasks,” which are then run in parallel to take advantage of the many cores available in modern CPUs.

Figuring out which tasks to run when is typically done by something called a “task scheduler,” which maps tasks to available hardware cores / operating system threads.

There are years of academic and industrial work on various types of task schedulers, worker pools, thread pools and the like.

My experience having worked with (and implemented, I am somewhat ashamed to admit) several custom task schedulers is that they are easy to get working initially (say 99.9% of the time), but then require lots (lots!) of time to get the corner cases (fast shutdown, task cancellation, draining, etc.). They are also notoriously hard to test due to their use of the lower-level threading primitives, and race conditions abound. I wouldn’t recommend doing it.

Thus, when casting around for a task scheduler in the Rust ecosystem, as we did for InfluxDB IOx and DataFusion, you naturally end up at Tokio, and it looks pretty good:

  1. You already have Tokio (no new dependencies).
  2. Tokio implements a sophisticated work-stealing scheduler.
  3. Tokio effectively has built-in language support for continuations (async/await), and many relatively mature libraries for streams, async locking, channels, cancellation, etc.
  4. Tokio is famously well-tested and heavily used across the Rust ecosystem.
  5. Tokio typically keeps running tasks and the futures they run on the same executor thread, which is great for cache locality.
  6. Tokio is well-documented, actively maintained and getting better all the time. (Tokio console was announced while I was writing this blog).

Thus, it is a no brainer to use Tokio as a task scheduler for CPU-heavy tasks, right? WROOOOOOOONG!

Common objections to using Tokio

It turns out that using Tokio was a fairly heated topic, and I would say still not everyone is 100% convinced yet, hence this blog post. We worried a lot about this question in the early days of DataFusion and InfluxDB IOx. Here are some common objections:

The Tokio docs say not to:

Older versions of Tokio docs (e.g., 1.10.0) included the (in my mind) famous admonition:

“If your code is CPU-bound and you wish to limit the number of threads used to run it, you should run it on another thread pool such as Rayon.”

I believe this wording caused significant confusion both within our team as well as in the broader Rust community. Many people read it to mean that a Tokio Runtime should never be used for CPU-bound tasks. The key point is actually that the same Runtimeinstance (the same thread pool) should not be used for both I/O and CPU, and we have subsequently clarified the intent of the docs (gory details on the PR).

As an aside, the Tokio docs suggest using Rayon for CPU-bound tasks. Rayon is a great choice for many applications, but it has no support for async, so if your code has to do any I/O at all, you will have to straddle the painful sync/async boundary. I also found it challenging to map a pull-based execution model where a task has to wait for all inputs to be ready before it can run to Rayon.

Tail latencies will stalk you

Wise people say, “Using Tokio for CPU-bound work will increase your request tail latencies, which is unacceptable.” But wait! You might say, “Tail latencies? I am writing a database and that sounds like an academic problem for web servers that are under high load…”

Not really: Consider the liveness check, de rigueur these days for systems deployed using container orchestration systems (ahem Kubernetes). The check that your process is behaving well is often an HTTP request to something like /health. If that request sits on a task queue somewhere because Tokio is fully using your CPU efficiently to chew through reams of data- processing tasks, Kubernetes does not get the required “Everything is Fine” response and kills your process.

This line of reasoning results in the classic conclusion that since tail latencies are critical, you can’t use Tokio for CPU-heavy tasks.

However, as the Tokio docs counsel, what is really important to avoid getting nuked by Kubernetes and friends while fully saturating the CPU is to use a separate thread pool - one for “latency is important” tasks, like responding to /health, and one for CPU-heavy tasks. The optimal number of threads for these thread pools varies by your need and is a good topic for another separate article.

Perhaps by thinking about a Tokio Runtime as a sophisticated thread pool, the idea of using different Runtime instances might seem more palatable, and we demonstrate how to do so with the dedicated executor below.

High per-ask overhead

But “wait!” I hear you say (or everyone hears you say on Hacker News), Tokio has high per-task overhead. I am not at all surprised people can make thread pools that crank through tiny tasks faster than Tokio.

However, I haven’t yet seen such a system I would trust for my production workloads, nor one that has as robust ecosystem support.

Thankfully, for many workloads, the per-task overhead can be amortized using “vectorized processing.” This is a fancy way of saying each task processes thousands of rows at a time rather than one. You can’t go crazy of course; you do need to break your work into reasonable-sized chunks, and you can’t amortize all workloads. However, for all the instances my applications cared about, the Tokio task overhead is lost in the noise.

How to use Tokio for CPU-bound tasks?

So let’s pretend I convinced you it might be OK to use Tokio for CPU-heavy work. How do you do so?

First, critically, your code needs to follow the adage “async code should never spend a long time without reaching an .await,” as explained in Alice Ryhl’s post. This is to give the scheduler a chance to schedule something else, steal work, etc.

Of course, “a long time” depends on your application; Ryhl recommends 10 to 100 microseconds when optimizing for response tail latencies. I think 10 to 100 milliseconds is also fine for tasks when optimizing for CPU. However, since my estimated per-task Tokio overhead is in the ~10 nanoseconds range, it is nearly impossible to even measure Tokio Runtime overhead with 10 millisecond tasks.

Second, run your tasks on a separate Runtime instance. How do you do that? Glad you asked.

Dedicated executor

Here is a simplified version of how we run tasks on a separate Tokio Runtime in InfluxDB IOx. (For more details, see this repo.)

pub struct DedicatedExecutor {
    state: Arc<Mutex<State>>,                                                                                                          
}                                                                                                                                      

/// Runs futures (and any `tasks` that are `tokio::task::spawned` by                                                                   
/// them) on a separate Tokio Executor                                                                                                 
struct State {                                                                                                    
    /// Channel for requests -- the dedicated executor takes requests                                                                  
    /// from here and runs them.                                                                                                       
    requests: Option<std::sync::mpsc::Sender<Task>>,                                                                                   

    /// Thread which has a different Tokio runtime
    /// installed and spawns tasks there                                                                                            
    thread: Option<std::thread::JoinHandle<()>>,                                                                                       
}                                               

impl DedicatedExecutor {                                                                                                               
    /// Creates a new `DedicatedExecutor` with a dedicated Tokio                                                                       
    /// executor that is separate from the threadpool created via                                                                      
    /// `[tokio::main]`.                                                                                                    
    pub fn new(thread_name: &str, num_threads: usize) -> Self {                                                                        
        let thread_name = thread_name.to_string();                                                                                     

        let (tx, rx) = std::sync::mpsc::channel::<Task>();                                                                             

        let thread = std::thread::spawn(move || { 
            // Create a new Runtime to run tasks                                                                                                                                                                                                                                
            let runtime = Tokio::runtime::Builder::new_multi_thread()                                                                  
                .enable_all()                                                                                                          
                .thread_name(&thread_name)                                                                                             
                .worker_threads(num_threads)
                // Lower OS priority of worker threads to prioritize main runtime                                                                                                                                                          
                .on_thread_start(move || set_current_thread_priority_low())                                                 
                .build()                                                                                                               
                .expect("Creating Tokio runtime");                                                                                     

         // Pull task requests off the channel and send them to the executor                                                                                                                                                         
         runtime.block_on(async move {                                                                                                        
                while let Ok(task) = rx.recv() {                                                                                                                                                                                                                              
                    Tokio::task::spawn(async move {                                                                                    
                        task.run().await;                                                                                              
                    });                                                                                                                
                }                                                                

        let state = State {                                                                                                            
            requests: Some(tx),                                                                                                        
            thread: Some(thread),                                                                                                      
        };                                                                                                                             

        Self {                                                                                                                         
            state: Arc::new(Mutex::new(state)),                                                                                        
        }                                                                                                                              
    }

This code creates a new std::thread, which creates a separate multi-threaded Tokio Runtime to run tasks and then reads tasks from a Channel and spawns them on the new Runtime.

Note: The new thread is key. If you try to create a new Runtime on the main thread or one of the threads Tokio has created, you will get an error, as there is already a Runtime installed.

Here is the corresponding code to send a task to this second Runtime.

impl DedicatedExecutor {                                                                                                               

    /// Runs the specified Future (and any tasks it spawns) on the                                                                     
    /// `DedicatedExecutor`.                                                                        
    pub fn spawn<T>(&self, task: T) -> Job<T::Output>                                                                                  
    where                                                                                                                              
        T: Future + Send + 'static,                                                                                                    
        T::Output: Send + 'static,                                                                                                     
    {                                                                                                                                  
        let (tx, rx) = tokio::sync::oneshot::channel();                                                                                

        let fut = Box::pin(async move {                                                                                                
            let task_output = task.await;                                                                                              
            tx.send(task_output).ok()                                                                                                                      
        });                                                                                                                            
        let mut state = self.state.lock();                                                                                             
        let task = Task {                                                                                                              
            fut,                                                                                                                       
        };                                                                                                                             

        if let Some(requests) = &mut state.requests {                                                                                  
            // would fail if someone has started shutdown                                                                              
            requests.send(task).ok();                                                                                                  
        } else {                                                                                                                       
            warn!("tried to schedule task on an executor that was shutdown");                                                          
        }                                                                                                                              

        Job { rx, cancel }                                                                                                             
    }  
 }

Job

The code above uses a wrapper around a Future called Job that handles transferring the results from the dedicated executor back to the main executor, which looks like:

#[pin_project(PinnedDrop)]                                                                                                             
pub struct Job<T> {                                                                                                  
    #[pin]                                                                                                                             
    rx: Receiver<T>,                                                                                                                   
}                                                                                                                                      

impl<T> Future for Job<T> {                                                                                                            
    type Output = Result<T, Error>;                                                                                                    

    fn poll(                                                                                                                           
        self: Pin<&mut Self>,                                                                                                          
        cx: &mut std::task::Context<'_>,                                                                                               
    ) -> std::task::Poll<Self::Output> {                                                                                               
        let this = self.project();                                                                                                     
        this.rx.poll(cx)                                                                                                               
    }                                                                                                                                  
}

And that’s it! You can find all the code in this GitHub gist.

Next steps

InfluxData is a big believer and supporter of open source. InfluxDB IOx makes significant use of and contributions back to the Rust implementation of the Apache Arrow columnar memory format and the Apache Arrow DataFusion query engine, which uses Tokio for its query plan execution.

We love community contributions, both docs and code. DataFusion and Arrow both have vibrant communities. Feel free to come and say hi.