finger exercises: pipes, bytes, and fibers

In which I try to figure out how to pack and unpack bytes over an in-process pipe so that I can use it in some future message framing protocol for a worker pool. There will be a guest appearance by Fiber to simplify the parsing of messages in a non-blocking manner.

Some of the pieces I will need are IO.pipe, IO.select, Array::pack, String::unpack, Fiber.new, and Fiber.resume. We’ll start with pack

Seems sensible. It takes an array of bytes and “directives” describing how to treat the elements of the array and then returns a binary representation of the array. The representation is given to us as a string but for our purposes we can pretend it is a byte array.

It should be somewhat clear what unpack will do because it will be the inverse operation for pack. When writing the network protocol it is very important that the client and server agree on the directives used to encode the message because otherwise one side or the other will be confused about what the byte array means. I still don’t understand how big-endian vs little-endian works out here but I think as long as long as I’m consistent with what directive I use on each side I should be fine

I have to make a choice here about the length of the prefix which in turn will force a maximum message size. I’m going to use 2 bytes for the message length which means the maximum message size will be 0xFFFF (65535 bytes). From the above output it looks like the directive I need to produce 2 byte output is 'n'.

Onto the IO.pipe shenanigans. First, the general structure to simulate producer/consumer model with some threads

Let’s fill in the writer thread. When sending the message I’m going to do it in 2 parts to simulate some network slowness to force the reader to deal with such issues gracefully

Now the reader. This is where Fiber will make an appearance. I highly recommend trying to write the reader without Fiber to see the pain points that Fiber solves

So the main reader loop just uses IO.select to wait on a pipe and when it is ready for reading passes it along to the fiber. Lets see what the fiber looks like

Seems like a lot but it is pretty simple. We just try to read in a non-blocking manner from the pipe that was passed to the fiber. If the pipe doesn’t have what we need then we try to continuously read in a non-blocking manner from the pipe, yielding to the main IO.select loop when the pipe is completely empty and trying to read as many bytes as possible when it is not empty. The pipe being empty is indicated by an exception (IO::WaitReadable) and in that case we just yield and wait to be resumed. I highly recommend trying to do the same thing without Fiber to see the value that Fiber adds.

As a skeleton this will work out as a nice playground for experimenting further. Here’s some output to demonstrate how the reader and writer interact

Notice the interleaving of the STDOUT.puts lines. The threads are indeed running together and stepping over each other.

Factoring out the pieces to make things more modular is left as an exercise for the reader.