What is Thread Safe Collection?

People use to said that writing a collection which is mutable, thread safe and usable is an extremely difficult process. Since the .NET 4 introduce the system.collection.concurrent namespace, multiple threads can safely and efficiently add or remove items from these collection without requiring additional synchronization in user code. When you write new code, use the concurrent collection classes whenever the collection will be writing to multiple threads concurrently and in this article I’ll going to show you about thread safe collection in Concurent Queue.

Concurent in .Net

Concurrent collections in .NET work very much like their single-thread counterparts with the difference that they are thread safe. These collections can be used in scenarios where you need to share a collection between Tasks. They are typed and use a lightweight synchronisation mechanism to ensure that they are safe and fast to use in parallel programming.

Concurrent queues

The Queue of T generic collection has a thread-safe counterpart called ConcurrentQueue. Important methods:

  1. Enqueue(T element): adds an item of type T to the collection
  2. TryPeek(out T): tries to retrieve the next element from the collection without removing it. The value is set to the out parameter if the method succeeds. Otherwise it returns false.
  3. TryDequeue(out T): tries to get the first element. It removes the item from the collection and sets the out parameter to the retrieved element. Otherwise the method returns false

The ‘try’ bit in the method names implies that your code needs to prepare for the event where the element could not be retrieved. If multiple threads retrieve elements from the same queue you cannot be sure what’s in there when a specific thread tries to read from it.

Example

Declare and fill a concurrent queue:

ConcurrentQueue<int> concurrentQueue = new ConcurrentQueue<int>(); 
for (int i = 0; i < 5000; i++)
{
    concurrentQueue.Enqueue(i);
}

We’ll want to get the items from this collection and check if all of them have been retrieved using a counter. The counter will also be shared among the threads using the ‘lock’ technique we saw in this post – or actually something that is similar to the ‘lock’ keyword: the Interlocked class. Interlocked has an Increment method which accepts a ref int parameter. It will increment the incoming integer in an atomic operation.

int itemCount = 0;
Task[] queueTasks = new Task[20];

for (int i = 0; i < queueTasks.Length; i++)
{
    queueTasks[i] = Task.Factory.StartNew(() =>
    {
        while (concurrentQueue.Count > 0)
        {
            int currentElement;
            bool success = concurrentQueue.TryDequeue(out currentElement);
            if (success)
            {
                Interlocked.Increment(ref itemCount);
           }
        }
    });
}

The while loop will ensure that we’ll try to de-queue the items as long as there’s something left in the collection.
Wait for the tasks and print the number of items processed – the counter should have the same value as the number of items in the queue:


Task.WaitAll(queueTasks);
Console.WriteLine("Counter: {0}", itemCount);