
Inter-thread messaging is a fundamental part of any asynchronous system. It is the component responsible for the transportation of data between threads. Messaging forms the infrastructure, scaffolding multi-threaded applications, and just like real-world transport infrastructure, we want it to be inexpensive, fast, reliable, and clean. For QuestDB, we wrote our own messaging system, and this post is about how it works and how fast it is.
#
ArchitectureBorrowing heavily from world-famous Disruptor our messaging revolves around multiple threads accessing shared circular data structure. We call it RingQueue. Semantically RingQueue provides unbounded, index-based, random access to its elements. It does not coordinate concurrent access nor does it provide guarantees on thread safety. Coordination and thread-safety is a concern of Sequences. Sequences are responsible for providing indices that can be used to access RingQueue concurrently and safely.
To help sequences do their magic they have to be shaped into a graph. We start with syntax to chain sequences together:
a.then(b).then(c).then(d)
The result is a trivial sequence graph:
a -> b -> c -> d
To branch we use helper class FanOut:
a.then(FanOut.to(b).and(c)).then(d)
The result is this sequence graph:
These two pieces of syntax are flexible enough to create any desired flow. This example shows that FanOut can have chain of sequences and other FanOuts:
a.then(FanOut.to(FanOut.to(b).and(c)).and(d.then(e)).then(f)
It is quite a mouthful but it creates this nice little graph:
FanOut can also be used as a placeholder in a chain to allow threads to subscribe/unsubscribe on the fly. Dynamic subscription is then simply adding a new sequence to FanOut:
Typical graph must contain single producer sequence and one or more consumer sequences. It will also have to be circular, e.g. to start and end with producer sequence. Graph has to be circular because we use circular underlying data structure, RingQueue. Without loop-back producer would be liable to overwrite queue elements before consumers had a chance to read them. Worse still, queue elements can be written to and read from concurrently. We don't want that to happen, right?
To help create practical sequence graph we implemented 4 types of sequences we can play with. These sequences are better understood as combination of their types and properties. SP - single producer, MP - multiple producer, SC - single consumer and MC - multiple consumer. Multi- sequences allow concurrent access and they guarantee that no two threads can retrieve same index. It is this property adds extra fun dimension to sequence graphs. Consider this graph:
A -> B -> A
or in Java notations:
A.then(B).then(A)
When "B" is an instance of MCSequence() we have a self-balancing worker pool. When "A" is MPSequence(), we have many-to-many pub-sub system. Cool, eh?
Single- sequences are faster but they are not thread-safe. They should be preferred for single-threaded consumer models.
Lets take a look at how threads interact with sequences. This is a typical example of publisher:
Sequence.next()
return values are:
-1 Queue is unavailable. It is either full or empty, depending on whether it is producer or consumer sequence
-2 Temporary race condition. Sequence failed CAS and delegated decision to your application.
Consumer sequence interaction is almost identical. The only difference would be consumer reading queue item instead of writing it.
Performance of single-threaded sequences can benefit further from batching. Batching relies on receiving range of indices from sequence and calling done() at end of batch rather than for every queue item. This is what consumer code might look like (producer code is the same):
Multi-threaded sequence do not support batches.
#
PerformanceI used Shipilev's project that already had Disruptor benchmark and I added QuestDB implementation of the same pipeline.
Benchmark source on GitHub
2 CPU MBP 2015
4 CPU x5960 @ 4.2Ghz
Disruptor and QuestDB perform essentially the same.
#
How to get itOur messaging system is on Maven central as a part of QuestDB. Don't worry about package size though, QuestDB jar is around 3.6MB and has no dependencies. Jump to the GitHub release page for version reference.