Here’s a really cool queue I wrote: https://github.com/FlovinTue/ConcurrentQueue
It began as a hobby project at the beginning of the 2018 summer when I felt frustrated I
(for several reasons) didn’t get to learn anything about threading. This was just after my first year at The Game Assembly. It was designed as an excuse to learn more about threading.
In the beginning it was a simple bounded multi-consumer-multi-producer structure using a circular buffer and a whole bunch of atomics to control the flow. After using it for a while I realized I could make it a lot faster by not having it be sequencially consistent, splitting production over a series of buffers. Both these versions can be viewed in the ‘Discarded’ folder at my github as StaticMultiThreadQueue.h and DataShuttle.h, along with a whole bunch of failed ideas.
As I experimented with different ideas I realized I could combine the best of the previous versions (Sequencial consistency but with separate producers) and get dynamic memory allocation to boot! This resulted in the final version which was also later expanded upon with exception safety and much optimization.
To explain the inner workings a little bit:
Internally all the producers keep a slot in an array, with pointers referring to one(usually the front-most) of the buffers in their buffer list. A new producer begins by allocating it’s initial buffer, and placing it in an appropriate slot.
Once inside the active buffer, it will iterate a slot iterator and, in the event that slot is marked as empty, insert it’s data there.
In the event a slot is marked as non-empty, the push will fail and the producer will allocate a new buffer and push it to the front of it’s list(Which can then be guaranteed to succeed, barring an excepting). No two producers will ever share a buffer.
Consumers circle around the producer arrays searching for one that’s interesting. (The search begins from a private offset to keep things a bit spread out).
When a consumer find one that has contents, it will store a ‘shortcut’ pointer directly to that buffer, and continue dequeueing stratight from the stored buffer until it fails.
Inside the buffer, when a consumer needs to dequeue an element, it first iterates an atomic that which indicates whether an entry is actually avaliable. Afterwards it claims an actual read slot using a second iterator variable.
As for the size method: It simply circles through all the buffer lists, collecting local sizes. The reason I am not using an atomic counter in the main structure is optimization. It becomes a highly contended atomic between all consumers & producers, and affects performance noticeably as a result.
To list some of its positives
- It’s really fast. I compared it against a couple of similar queues (Boost, moodycamel, Microsoft) and it beats them all in both single & multithreaded performance. (Not counting moodycamel’s bulk operations).
- Basic exception safety.
- Wait-free pushing & Lock-free popping
Things to improve upon in the future:
- Currently there is no garbage collection — old used-up buffers just sit around collecting virtual dust, until they are destroyed with the rest of the structure.
- Recycling of producer slots. In the event a thread is killed, the arrayslot it occupies goes unused. This could be improved upon by having a mechanism for packing together the remaining existing producers & decrementing the counter.
- Recycling of object id’s. As both producers and consumers rely on the local object Id to store local pointers, the arrays used must grow when encountering an Id higher than current capacity. This could be made more efficient by recycling the Id’s of destroyed queues.