Friday 13 December 2013

JAQ: Using JMH to Benchmark SPSC Queues Latency - Part I

{This post is part of a long running series on lock free queues, checkout the full index to get more context here}
{If you come here looking for JMH related content start at the new and improved JMH Resources Page and branch out from there!}
As part of the whole JAQ venture I decided to use the excellent JMH (Java Microbenchmark Harness see: project here, introduction to JMH here, and multi-threaded benchmarking here). The reason I chose JMH is because I believe hand rolled benchmarks limit our ability to peer review and picking a solid framework allows me and collaborators to focus on the experiment rather than the validity of the harness. One of the deciding factors for me was JMHs focus on multi-threaded benchmarks, which is something I've not seen done by other frameworks and which forms it's own set of problems to the aspiring benchmarker. It's not perfect, but it's the closest we have.
Anyways... that's the choice for now, lets jump into some benchmarks!

Using the JMH Control

To benchmark the latency of 2 threads communicating via a queue requires using the JMH Control structure. This is demonstrated in the JMH samples measuring thread to thread "ping pong" via an AtomicBoolean.compareAndSwap (see original here):

The challenge here is that the CAS operation may fail, and when it does we would want to retry until we succeed, BUT if the opposite thread were to be stopped/suspended because the benchmark iteration is done we would be stuck in an infinite loop. To avoid this issue a benchmarked method can have a Control parameter passed in which exposes a flag indicating the end of a run, this flag can be used to determine benchmark termination and thus avoid the above issue. You'll notice other JMH annotations are in use above:
  • State: sets the scope of the benchmark state, in this case the Group
  • Group: multi-threaded benchmarks can utilize thread groups of different sizes to exercise concurrent data structure with different operations. In the case above we see one group calls pingpong. Both methods are exercised by threads from the group, using the default of 1 thread per operation.
  • Note that if we were to run the above benchmark with more threads, we will be running several independent groups. Running with 10 threads will result in 5 concurrent benchmark runs each with it's own group of 2 threads.
I pretty much copied this benchmark into the jaq-benchmarks and refer to it as a baseline for inter-thread message exchange latency. The results for this benchmark are as follows (running on the same core):
Benchmark                                  Mode Thr     Count  Sec         Mean   Mean error    Units
i.j.s.l.BaselinePingPong.pingpong          avgt   2        10    1       23.578        0.198    ns/op
i.j.s.l.BaselinePingPong.pingpong:ping     avgt   2        10    1       23.578        0.198    ns/op
i.j.s.l.BaselinePingPong.pingpong:pong     avgt   2        10    1       23.578        0.198    ns/op

And running across different cores:
Benchmark                                  Mode Thr     Count  Sec         Mean   Mean error    Units
i.j.s.l.BaselinePingPong.pingpong          avgt   2        10    1       60.508        0.217    ns/op
i.j.s.l.BaselinePingPong.pingpong:ping     avgt   2        10    1       60.496        0.195    ns/op
i.j.s.l.BaselinePingPong.pingpong:pong     avgt   2        10    1       60.520        0.248    ns/op


The way I read it CAS is sort of a round trip to memory, reading and comparing the current value, and setting it to a new value. You could argue that it's only a one way trip, and I can see merit in that argument as well. The point is, these are the ballpark figures. We shouldn't be able to beat this score, but if we manage to get close we are doing well.
Note from a hardware point of view: See this answer on Intel forums on cache latencies. Latencies stand at roughly:
"L1 CACHE hit, ~4 cycles
 L2 CACHE hit, ~10 cycles
 L3 CACHE hit, line unshared ~40 cycles
 L3 CACHE hit, shared line in another core ~65 cycles
 L3 CACHE hit, modified in another core ~75 cycles
 remote L3 CACHE ~100-300 cycles
 Local Dram ~60 ns
 Remote Dram ~100 ns"
To convert cycles to time you need to divide by your CPU clock speed, the delay manifestation will also be dependant on whether or not you require the data to make progress etc. The thing to keep in mind here is that there is a physical limit to the achievable latency. If you think you're going faster... check again ;-). 

Queue round trip latency benchmark

Queues are not about exchanging a single message, they are about exchanging many messages asynchronously (at least that's what the queues I care about are about). Exchanging a single message is a worst case scenario for a queue, there's no gain to be had by batching, no false sharing to avoid, nothing much to cache, very little cost to amortize. It is therefore a valuable edge case, and an important one for many real world applications where latency needs to be minimized for bursty traffic/load. To put my queues to the test I devised a round trip benchmark which allows us to chain a configurable number of threads into a ring and measure the round trip latency. This should help magnify the exchange costs and perhaps expose behaviours not so easily detected in the minimal case. Here's an outline of the functionality under test given a chain length N (which requires N queues and N threads including the measuring thread) and a burst of K messages:
  1. From thread[0] send K objects down queue[0].
  2. Concurrently: In each thread K(0<K<N) read from queue[K-1] and write to queue[K].
  3. Thread[0] waits for K objects to be polled from queue[N-1] and returns.
There were several challenges/intricacies to be considered here:
  • The number of benchmarking threads is unknown at runtime (in JMH), so chain length must be defined separately.
  • Setup/Teardown methods on the Iteration level are called per thread.
  • Thread state is allocated and used per thread per iteration. The threads are pooled, so a given thread can switch 'roles' on each iteration. In other words @State(Scope.Thread) is not ThreadLocal.
  • Concurrent polls to SPSC queues can leave the queue in an undefined state.
  • The benchmark requires that the queues are empty when we start measuring, so at the beginning or end of an iteration we must clear the queues.
  • Queues must be cleared from the consumer thread.
So things are not that simple, and the code (full code here) is an attempt to cover all these intricacies:

I'm relying on correct usage for some things, but JMH is basically doing most of the heavy lifting:
  • JMH is taking care of all the measurements/measurement modes/reporting/formatting including forked runs and stats summaries and all.
  • JMH is taking care of thread management, no need to spin up threads/executors and shut them down.
  • JMH is synchronizing iterations and managing per thread state setup/teardown. Note the @TearDown annotation in Link/Source.
  • JMH is allowing the group thread balance to be parametrized (just recently) which allows generic asymmetric multi-threaded benchmarks like the above.
Where I have to work around the framework:
  • I would like to only specify the chain.length and derive the thread allocation from that. This is possible via the JMH launcher API.
  • I would like @State(Scope.Thread) to mean ThreadLocal.
  • I would like to be able to pass State down the stack as constructor parameters, so I could avoid the static field for sharing benchmark state between thread state objects.
I'd be lying if I said writing this benchmark the JMH way was easier than hacking something together, it take getting used to and some of the features I required are fairly late additions. But the benefit is worth it.
I can't stress this point enough, the objective here is more than collecting data, it is about enabling, supporting and empowering peer review. I want the benchmarks to be as simple as they can be so people can either be easily convinced or easily point out my errors. The less code I need my peers to read, the easier it is for anyone to build/run/tweak the benchmarks, the less we rely on handrolled/halfassed attempts at benchmark frameworks THE BETTER (having said that, you got to do what you got to do). 
Note: If you care about latency you should watch Gil Tene's excellent talk on How Not To Measure Latency, where he discusses coordinated omission and it's impact on latency measurement. As you may have noticed this experiment suffers from coordinated omission.  I'll have to live with that for now, but I don't feel the data gained from a corrected measurement would impact the queue implementations in any form.

Hard Numbers? Numbers Hard?

We've got the following queues:
  1. ConcurrentLinkedQueue (CLQ) - Good old JDK lock free queue, for reference. 
  2. InlinedCountersSpscConcurrentArrayQueue (CAQ) - My inlined counter variation on Martin Thompson's ConcurrentArrayQueue previously discussed/explained/explored here and compared to this variation here. It can be run with sparse data as described here.
  3. FFBuffer (FF1) - A Java port of the FastFlow SPSC implementation with sparse data as described here.
  4. BQueue (BQ)- An SPSC previously discussed here which introduces an offer side batch of the null check (which also serves as a full queue induced false sharing protection) and a poll side batching of emptiness testing which also includes a spinning backoff to avoid the near empty queue issue.
  5. FFBufferWithOfferBatch (FF2) - A BQueue on the offer side and an FFBuffer on the poll side, this is a new addition to the arsenal.
JMH gives us 2 relevant benchmark modes for the above use case:
  1. AverageTime - Average time per operation (from the command line use '-bm avgt')
  2. SampleTime - Time distribution, percentile estimation (from the command line use '-bm sample')
And we could also look at the same core vs. cross core latency, vary the length of the chain and examine the effect of sparse data on latency. This is a bit of an explosion of data, too much for one post, but lets dip our toes...
I'll start with the average RTT latency for each type, running same core and cross core:

The benchmarks were run on my laptop (JDK7u45/Ubuntu13.10/i7-4700MQ@2.40GHz no turbo boost/benchmark process pinned using taskset) and though I ran them repeatedly I didn't systematically collect run to run variance data, so consider the results subject to some potential run to run variance.
As we can see:
  • FF2/FF1 are very similar and take the cake at 60/230ns (FF2 is marginally faster).
  • BQ is the worst [392/936ns] because of its backoff on approaching near empty strategy.
  • It is perhaps surprising to see that although CAQ/FF1/BQ show very similar throughput in my previous tests their latency characteristics are so distinct.
  • Note that CLQ is actually not too bad latency wise (in this particular use case). 
Lets have a look at what happen when the burst size increases[I dropped FF1 out of the race here, too similar to FF2, this is running across cores]:
We can see that:
  • CLQ makes no effort to amortize costs as its RTT grows with the burst size.
  • BQ starts off as the worst, but somewhere between burst size 16 and 32 the benefits of it's design start to show.
  • FF2 and CAQ demonstrate similar growth patterns with the cost per item dropping as the burst size grows. But FF2 remains the clear winner, at burst size 32 the round trip is 640ns.
  • 640ns for 32 messages averages at just 20ns per message. This is NOT A MEANINGFUL LATENCY NUMBER! :) 

Latency != Time/Throughput

Let me elaborate on this last point. The average cost is not a good representative of the latency. Why not? latency in this case is the round trip time from each message perspective, throughput in a concurrent environment is not an indicator of latency because it assumes a uniformity that is just not there in real life. We know the behaviour for the first message is at best the same, so assuming first message latency is 230ns, how would the other messages timeline need to look to make the mean latency 20ns? is that reasonable? What is really happening here is as follows (my understanding in any case):
  • Producer pretty much writes all the messages into the queue in one go within a few nano-seconds of first and last message being written. So if we mark T1..T32 the send time of the messages, we can estimate T32 - T1 is within 100ns - 150ns (single threaded read/write to FF2 takes roughly 6ns, assume an even split between write and read and add some margin). Near empty queue contention can add to this cost, or any direct contention between poll/offer.
  • In fact, assuming a baseline cost of 6ns read/write, multiplied by chain length, leads to a base cost of 32*6*2=384ns. Following from this is that the minimum latency experienced by any message can never be less than 32*3=96ns as the burst must be sent before messages are received.  96ns is assuming zero cost for the link, this is how fast this would go offering and polling from the same queue in the same thread.
  • With any luck, and depending on the size of the burst, the consumer will see all the messages written by the time it detects data in the queue (i.e. it won't contend with the producer). There is opportunity here for reduced cost which is used by FF2/CAQ and BQ.
  • The consumer now reads from the queue, the first message is detected like it would for burst size 1, i.e after 115ns or so for FF2. This can only get worse with the contention of potentially other messages being written as the first message is read adding further latency. The rest of the messages are then read and there is again an opportunity to  lower costs. The messages are written into the outbound queue as soon as they are read. Reading and writing each message has a cost, which we can assume is at the very least as high as 6ns.
  • The producer now detects the messages coming back. Let's mark the return times for the messages R1-R32. We know T1 - R1 is at least 230ns and we know that T1 - R32 is roughly 640ns. Assuming some fixed costs for writing/reading from the queue, we can estimate that R32-R1 is also within 100-150ns.
  • So, working through the timeline as estimated above gives us the following best case scenario for message 32:
    • 3ns     - Source sends message 1
    • 96ns   -  Source sends message 32
    • 115ns - Link picks up message number 1, has to work through messages 1-31
    • 230ns - Source picks up message 1
    • 301ns - Link is done transferring 31 messages and is finally picking up message 32
    • 307ns - Link has sent message 32
    • 346ns - Source has finished processing message 32
  • In this hypothetical best case scenario, message 32 which had the best chance of improved latency enjoyed a 150ns RTT. This is the theoretical best case ever. IT DIDN'T HAPPEN. The RTT we saw for a 32 messages burst was 640ns, indicating some contention and an imperfect world. A world in which there's every chance message 32 didn't have a latency as good as message 1.
  • The fallacy of the average latency is that it assumes a uniform distribution of the send and receive times, but the reality is that the groups are probably lumped together on a timeline leading to many send times being within a few ns of each other, and many receive times being within a few ns of each other. This is the magic of buffering/queuing.
  • The latency experienced by the last element in a burst can be better than the latency experienced by the first, but not significantly so and subject to some very delicate timing. The important thing here is that good queues allow the producer to make progress as the consumer idles or is busy consuming previous items, thus allowing us to capitalize on concurrent processing.

To Be Continued...

There's allot of experimentation and data to be had here, so in the interest of making progress with the project as well as sharing the results I'll cut this analysis short and continue some other time. To be explored in further posts:
  • Compare sparse data impact on CAQ/FF2 for the bursty use case - I have done some experimentation which suggests FF2 gains little from the sparse data setting, while for CAQ the improvements are noticeable for certain burst sizes.
  • Compare average time results with the sampling mode results - There are significant differences in the results, which I'm still trying to explain. One would expect the sampling mode to have some fixed overhead of having to call System.nanoTime() twice for each sample, but the difference I see is an x2 increase in the mean.
  • Compare queue behaviour for different chain lengths - The results are unstable at the moment because the thread layout is not fixed. For chain length 2-4 the topography can be made static, but from chain size 5 and up the number of cross core crossings can change in mid run (on my hyper threaded quad core anyways), leading to great variance in results (which is legitimate, but annoying). I will need to pin threads and thread 'roles' to stabilize the benchmark for this use case, probably using JTA (Java Thread Affinity - by Sir Lawrey).
The overall result is pointing to FF2 as the clear winner of the fastest queue competition, but the variety is educational and different designs are useful for different things. This also strengthens my conviction that a requirement based factory for queues serves your code much better than naming the queue flavour you feel like as you code. All the more reason to invest more time into JAQ :-).

Thanks to Darach, Norman, and Peter for reviewing the post, and Aleksey for reviewing the benchmark. Peer review FTW!

Monday 2 December 2013

Announcing the JAQ(Java Alternative Queues) Project


{This post is part of a long running series on lock free queues, checkout the full index to get more context here}
To quote Confucius:
"To learn and then practice it time and again is a pleasure, is it not? To have friends come from afar to share each others learning is a pleasure, is it not? To be unperturbed when not appreciated by others is gentlemanly, is it not?" - Analects 1:1
It is obvious to me the old man was talking about open source software, where we repeat what we learn, share with friends from afar, and try and behave when no one seems to get it. In this spirit I am going to try and apply lessons learnt and put together a concurrent queues library for Java - Java Alternative Queues.
It's early stages, but at this point I would value some feedback on:
  1. Intentions
  2. Interfaces and usability
  3. Project roadmap

Intentions

When concurrent queues are concerned, it is my opinion that the JDK offering has been robust, but too generic to benefit from the performance gains offered by a more explicit declaration of requirements. JAQ would tackle this by providing queues through a requirements focused factory interface allowing the user to specify upfront:
  1. Number of producers/consumers
  2. Growth: Bounded/Unbounded
  3. Ordering (FIFO/other)
  4. Size
  5. Prefer throughput/latency
To see a wider taxonomy of queues see 1024Cores.net excellent analysis. At this point all the queues I plan to implement are non-blocking and lock-free as my main interest lies in that direction, but many of the principals may hold for blocking implementations and those may get added in the future.

Interfaces and Usability

I like the idea of separating several entities here:
  • ConcurrentQueueFactory - Tell me what you need, through a ConcurrentQueueSpec.
  • ConcurrentQueue - The queue data structure, provided by the factory. At the moment it does nothing but hand out producer/consumer instances. This is where pesky methods such as size/contains may end up. I'm not keen on supporting the full Queue interface so feedback on what people consider essential will be good.
  • ConcurrentQueueConsumer - A consumer interface into the queue, provided by the queue. I'm planning to support several consumption styles,.
  • ConcurrentQueueProducer - A producer interface into the queue, provided by the queue.
The producer/consumer instances are thread specific and the appropriate thread should call into the queue provider method. Here is the old QueuePerfTest converted to use the new interface (I cleared out the irrelevant cruft for timing this and that):

I realize this goes against the current Queue interface, but part of the whole point here is that the more we know about the calling code the better performance/correctness we can hope to offer.

Roadmap

I'd like to tackle the work in roughly the following order:
  • Specify/document/settle on high level interfaces (initial cut done)
  • SPSC implementations/tests/benchmarks (good bounded SPSC implementation is done, some benchmarks)
  • MPSC implementations/tests/benchmarks (some bounded MPSC variants are included but not integrated)
  • SPMC implementations/tests/benchmarks (not started)
  • MPMC implementations/tests/benchmarks (not started)
There's more I want to do in this area, but the above will keep me busy for a while so I'll start with that and increase the scope when it reaches a satisfying state.
I'm using JMH (and getting valuable support from @shipilev) for benchmarking the queues and hoping to use JCStress to test multi-threaded correctness. 

Contributors/Interested Parties

I know I'll be using this library in the near future for a few projects, but I hope it will be generally useful so your feedback, comments and observations are very welcome. I've not been involved much in open-source projects before, so any advise on project setup is also welcome. Finally, if you feel like wading in and cutting some code, adding some tests or benchmarks, reporting some bugs or expressing interest in features BRING IT ON  :-) pull requests are very welcome.