simple in-memory store with consistent reads

Problem Statement

Suppose you want to write a simple in-memory JSON store with an equally simple socket based protocol. You want this in-memory store to support parallel and consistent reads. By “parallel reads” what I mean is if 10 clients request to read data from the store then no client should be blocking any other client. By “consistent reads” what I mean is when a client requests some data from the store there is absolutely no way that client gets half of the data before a write and half of it after a write and there is also some kind of ordering for reads and writes. In other words, if we have an array “[1,2,3,4]” that corresponds to the key “ints” in our JSON store then the following sequence of events is impossible:

  1. Client A connects and requests “ints”
  2. Server starts to serialize the array to send it back and gets as far as “[1,2” before another client, Client B, connects
  3. Client B asks the server to modify the array and set the element at index 0 to 5 and the element at index 3 to 1
  4. Server commits Client B’s request resulting in a new array “[5,2,3,1]”
  5. Server continues to serialize the data for Client A and finishes with “[1,2,3,1]”

So when Client A initiated the connection and asked for the data the array was “[1,2,3,4]” and what it got back was “[1,2,3,1]”. Just by luck of the draw it saw the modification at index 3 but not the modification at index 0 because of how the serialization process was interleaved with the server’s handling of client connections. That’s a simplified sequence of events and in fact even more bizarre things can happen if you’re not careful with how you handle access to the store.

Here’s some broken server code. It doesn’t quite demonstrate the scenario I described above but it’s still unsafe in many ways and provides no guarantees about the ordering of reads and writes:

require 'socket'
require 'json'

##
# Client request handlers.

module ClientHandler

  def self.handle_read(client, store)
    puts "Handling read request"
    stringified_store = store.read
    packed_length = [stringified_store.length].pack("Q*")
    client.write packed_length
    client.write stringified_store
    client.shutdown(:RDWR)
  end

  def self.handle_write(client, store)
    puts "Handling write request"
    payload_length = client.read(8).unpack("Q*")[0]
    payload = client.read(payload_length)
    store.write(payload)
    client.shutdown(:RDWR)
  end

end

##
# Simple wrapper around a hash map.

class JSONStore

  @@store = {}

  def self.write(stringified_json)
    @@store = JSON::parse(stringified_json)
  end

  def self.read
    @@store.to_json
  end

end

##
# Threaded server loop.

server = TCPServer.new(2000)
loop do
  Thread.start(server.accept) do |client|
    begin
      case client.getc
      when 'R'
        puts "Read Request"
        ClientHandler::handle_read(client, JSONStore)
      when 'W'
        puts "Write Request"
        ClientHandler::handle_write(client, JSONStore)
      else
        puts "Unknown Request: closing socket."
        client.shutdown(:RDWR)
      end
    rescue Exception => e
      puts "Something went wrong!"
      puts e
      client.shutdown(:RDWR)
    end
  end
end

The code should be pretty readable but in case it’s not. From top to bottom we have the following pieces of functionality: a module to handle client requests by reading and writing from the store, the actual store itself, the server loop. The server/client protocol is pretty simple. For a read request the client writes the character ‘R’ and the server responds with a serialized version of the store with an 8 byte prefix. The 8 byte prefix is the length of the payload that the client should read after the first 8 bytes. Pretty much the dual situation applies for writes. The client writes the character ‘W’ followed by an 8 byte prefix that represents the length of the payload and finally followed by the payload.

The above code is broken in many different ways but the most important way it’s broken is that it provides absolutely no guarantees about how reads and writes happen and in what order. Also, depending on how the JSON parsing and serialization happens we might even corrupt the store. This is obviously no good and we would like to fix it up to the point that we can provide somewhat stronger guarantees about reads and writes and also not corrupt the store but without giving up parallel reads.

Possible Fixes

There are several avenues of approach to fixing up the server code. You could re-write this in a language where the mutability of the store is not an issue and everything is referentially transparent, i.e. when we assign something to a variable and pass it around then we know there is no way that variable can change what it points to half way through the computation. Haskell and Erlang come to mind as languages where you could assume these things about your variables. But let’s say you don’t want to write it in Haskell or Erlang.

The next thing you could do is read about locks and mutexes and start peppering the code with locks to create critical sections of code that would only be accessible to one thread at a time. This would also work to an extent but it is notoriously hard to write correct code when locks are involved and depending on various circumstances the extra complexity might not even be worth it because of the performance hit from the serialization that locks introduce.

The Actual Fix

If you go back and look at the code you’ll notice that we are using threads as our concurrency mechanism but that’s not the only way to do it. We could also use process based concurrency with the help of “fork”. Before Ruby 2.0 this would have added a little too much overhead in terms of memory but Ruby 2.0 introduced a new garbage collection mechanism that is way friendlier when it comes to Copy-On-Write so we pay less of a memory penalty for forking a Ruby process. You can read more about the new garbage collection algorithm at Pat Shaughnessy’s blog because he explains it way better.

So now that we have that out of the way let’s fix the server code and convince ourselves that it does the right thing. Here’s the new server loop:

##
# Threaded server loop.
 
server = TCPServer.new(2000)
loop do
  client = server.accept
  begin
    case r = client.getc
    when 'R'
      puts "Read Request"
      fork { ClientHandler::handle_read(client, JSONStore); exit }
    when 'W'
      puts "Write Request"
      ClientHandler::handle_write(client, JSONStore)
    else
      puts "Unknown Request: #{r}."
      client.shutdown(:RDWR)
    end
  rescue Exception => e
    puts "Something went wrong!"
    puts e
    client.shutdown(:RDWR)
  end
end

Let’s go through the logic and convince ourselves that we now have a correct ordering of reads and writes and there is no chance of corrupting the store. Suppose we have the following sequence of requests “R, R, W: {int: 2}, R, R”. In the original broken code that sequence could have been executed in any random order you could imagine. Reads could have happened before the write, after it, or even in reverse chronological order. In the new server loop the order of the reads before the write does not matter but all of them will see the empty JSON object. This is because every read happens in a forked process and a forked process gets a “copy” of the entire memory space of its parent process. I say “copy” because Copy-On-Write will only copy things if there are modifications to the data and if there aren’t any then we don’t pay a memory penalty for the forked process. The reads also don’t block each other because they are all different processes running in parallel. When we get to a write request we don’t fork an extra process and instead handle the request in the main process. This means that all new incoming read requests will be blocked until the write request is committed to the store. Once the write is committed we fork processes for all the blocked reads and all subsequent reads see “{int: 2}” until the next write request at which point the same logic applies.

Conclusion

So what we essentially did was use the memory semantics of forked processes to emulate immutable data and along the way introduced an ordering between sets of read requests before a write request and sets of read requests after it. Not too shabby for about 60 lines of code. Making it production ready is left as an exercise for the reader.