Async runtimes in Rust
While working on another rust project, I was looking for an async runtime that satisfied 3 simple conditions:
uses a single-threaded executor (so never spawning another thread)
allows async reading from stdin and writing to stdout
has some timer/sleep futures
These are not hard requirements for me, but I presumed they were straightforward enough for most popular runtimes to be able to handle, which turns out not to be the case. First thing I noticed is that many popular runtimes like tokio or smol are built with multi-threading in mind, and so often use background threads to handle stuff. Another problem here is that async runtimes require some os primitives to work (mainly some IO polling mechanism), and because these runtimes have to support all popular operating systems, it becomes very difficult to provide abstractions that are performant and consistent across the board.
For instance, smol provides the Async type which is an abstraction over any type that implements AsFd and gives it non-blocking IO, but if you look at the fine print:
Async supports all networking types, as well as some OS-specific file descriptors like timerfd and inotify.
However, do not use Async with types like File, Stdin, Stdout, or Stderr because all operating systems have
issues with them when put in non-blocking mode.
Similarly, tokio implements some async oprations by blocking on a separate thread and sending back the result:
fs operations
dns resolution through ToSocketAddrs
writing to Stdout or Stderr
reading from Stdin
I found these limitations intriguing and decided to try to understand them by building my own async runtime that would perfectly fit my very specific needs!
If you want to try this for yourself but don't know much about how async rust works, I recommend starting out with the mini-tokio tutorials and this video tutorial
The plan
An async runtime is usually composed of 2 loosly-coupled components, an executor and a reactor.
the executor runs any futures which are able to make progress.
the reactor notifies when a future is able to make progress.
The executor
A single-threaded executor is also sometimes referred to as an event loop. It roughly looks like this:
pub fn run(&mut self) {
while !self.tasks.is_empty() {
for task in self.tasks.drain() {
task.run();
}
self.wait_for_io();
}
}
On each iteration, we first run all available tasks, and then wait for any IO requests to resolve, which will in turn allow more tasks to be able to make progress - rinse and repeat until there are no more tasks.
It's important to note here that tasks should never block, instead, if they need to perform some IO, they should register with the reactor instead and return. This is also true for any other async runtime, but it is particularly important with event loops because otherwise none of your other tasks will get a chance to run.
tasks can also "block" just by doing too much work, which is why event loops shouldn't be used for compute heavy purposes (I'm looking at you nodejs)
The reactor
A reactor is a registry for all IO operations that you are waiting for. Whenever you want to perform IO, you can register your request with the reactor and then yield control back to the executor. The reactor will keep track of your request and will reschedule your task when it is able to continue.
The implementaion of a reactor depends on some os primitive for async IO. I am mostly familiar with Linux so that is my only target for this project, which should make the implementation a lot easier because I don't have to support windows or macos and their respective apis.
Linux actually supports 5 mechanisms for async io, aio, select, poll, epoll and the latest one io_uring. While the first three are included in POSIX, they are not particularly efficient. Epoll, specific to Linux, offers better performance but faces limitations with certain file descriptors, particularly regular files. Both tokio and smol utilize epoll which is part of the reason for those limitations I mentioned earlier - these os abstractions are leaky and require workarounds. Io_uring was built to solve this problem - it is uniform across file descriptors and it's also currently the fastest way to do IO on Linux thanks to the design of the api.
So I decided to try something new and use io_uring.
Building the executor
First thing I did was create the task abstraction. A task consists of an inner future and some mechanism for scheduling it, or more specifically, marking it as ready to make progress. From now on, I'll refer to tasks that are able to make progress as "active" and to tasks that are not as "waiting".
pub struct Task {
future: RefCell<Pin<Box<dyn Future<Output = ()>>>>,
queue: flume::Sender<Rc<Task>>,
}
impl Task {
pub fn poll(&self, cx: &mut Context) -> Poll<()> {
self.future.borrow_mut().as_mut().poll(cx)
}
pub fn schedule(self: &Rc<Self>) {
self.queue.send(Rc::clone(self)).unwrap()
}
}
pub struct TaskQueue {
sender: flume::Sender<Rc<Task>>,
receiver: flume::Receiver<Rc<Task>>,
}
impl TaskQueue {
pub fn new() -> Self {
let (sender, receiver) = flume::unbounded();
Self { sender, receiver }
}
pub fn schedule<F>(&self, future: F)
where
F: Future<Output = ()> + 'static,
{
let task = Task {
future: RefCell::new(Box::pin(future)),
queue: self.sender.clone(),
};
self.sender.send(Rc::new(task)).unwrap();
}
pub fn drain(&self) -> impl Iterator<Item = Rc<Task>> + '_ {
self.receiver.try_iter()
}
pub fn is_done(&self) -> bool {
self.receiver.sender_count() == 1
}
}
I'm using a flume::unbounded channel to keep track of tasks that are active in a TaskQueue
. Each task owns a sender to the queue,
so it can reschedule itself when it is ready to become active. We can track the total amount of tasks we have left (even those that are waiting)
by counting the number of Sender
s for our TaskQueue
. Since the executor will always run all active tasks on each loop iteration,
we can skip the traditional pop
method and instead offer a drain
that will return an iterator over all values from the receiver.
The schedule
method is just a convenience for creating a Task
from a Future
and scheduling it.
Next up the executor, which contains the event loop and some convenience functions for spawning tasks.
struct Executor {
queue: TaskQueue,
}
impl Executor {
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + 'static,
{
self.queue.schedule(future);
}
fn block_on<F>(&self, future: F) -> F::Output
where
F: Future + 'static,
{
let (sender, receiver) = oneshot::channel();
self.spawn(async move {
let _ = sender.send(future.await);
});
self.run();
receiver.recv().unwrap()
}
fn run(&self) {
while !self.queue.is_done() {
for task in self.queue.drain() { // execute all tasks
let waker = todo!();
let _ = task.poll(&mut Context::from_waker(&waker));
}
todo!(); // wait for io
}
}
}
Tasks can be added to the event loop with spawn
, but if you also want to also wait for the return value,
you can use block_on
- these are just wrappers that "send" the future to the TaskQueue
.
It's important to note here that all methods on the executor take an immutable reference, which allows us to spawn tasks even from inside the event loop.
Looking at the implementation of run, there are still 2 piecess missing - creating the waker for the future and waiting for io. The latter will be implemented as part of our reactor, so let's look at the waker first.
pub fn waker(task: Rc<Task>) -> Waker {
let raw = Rc::into_raw(task).cast::<()>();
let vtable = &WakerHelper::VTABLE;
unsafe { Waker::from_raw(RawWaker::new(raw, vtable)) }
}
struct WakerHelper;
impl WakerHelper {
const VTABLE: RawWakerVTable =
RawWakerVTable::new(Self::clone, Self::wake, Self::wake_by_ref, Self::drop);
unsafe fn clone(ptr: *const ()) -> RawWaker {
let rc = Rc::from_raw(ptr.cast::<Task>());
std::mem::forget(rc.clone());
std::mem::forget(rc);
RawWaker::new(ptr, &Self::VTABLE)
}
unsafe fn wake_by_ref(ptr: *const ()) {
let rc = Rc::from_raw(ptr.cast::<Task>());
rc.schedule();
std::mem::forget(rc);
}
unsafe fn wake(ptr: *const ()) {
let rc = Rc::from_raw(ptr.cast::<Task>());
rc.schedule();
drop(rc);
}
unsafe fn drop(ptr: *const ()) {
drop(Rc::from_raw(ptr.cast::<Task>()));
}
}
A waker is just a dynamic dispatch table with 4 methods: clone
, wake
, wake_by_ref
and drop
.
The reason it's not implemented with a dyn
trait is because clone is not actually object safe - so we have to do all this
nasty raw stuff to make it work. Our underlying pointer for the waker is just an Rc<Task>
, which makes all of these
methods fairly easy to implement.
For
clone
, we just clone theRc
andmem::forget
both of them to not decrement the ref counter.For
wake_by_ref
we call ourschedule
method andmem::forget
theRc
to not decrement the ref counter.For
wake
we also call ourschedule
method but this time also drop theRc
so the ref counter does get decremented.And
drop
is straightforward.
This implementation is similar to ArcWake from the futures
crate but using Rc
instead of Arc
since I don't care about atomicity.
But if you are looking for a safe way to create a waker I definitely recommend that instead.
Wakers
Taking a step back and examining our waker more closely, we can see that the only functionality it really has is in the wake
method (and consequently wake_by_ref
). But this simple piece of code is actually the only interface bridging our executor and
the reactor and any other futures it may be executing.
From this point on we can design our reactor and futures with minimal concern for the event loop itself - all we really care
about is that there is a magical waker
that will rechedule futures that are able to make progress.
In fact, if we just plug in our waker in the event loop we actually have a working executor.
fn run(&self) {
while !self.queue.is_done() {
for task in self.queue.drain() { // execute all tasks
let waker = waker(task.clone());
let _ = task.poll(&mut Context::from_waker(&waker));
}
// TODO: wait for io
}
}
Using the Future trait, we can pass our waker down to any future that is interested in it - even those that do not know/care about our event loop. Channels are a good example of futures that are completely executor agnostic - they just work.
pub fn main() {
let exe = Executor::default();
let (tx, rx) = futures::channel::oneshot::channel();
exe.spawn(async {
let _ = rx.await.unwrap();
});
exe.spawn(async {
tx.send("hello").unwrap();
});
exe.run();
}