Thread safe queue in C++
In this article, you will learn about the thread safe queue in C++ with several examples.
Introduction:
A thread-safe queue in C++ is a data structure that allows multiple threads to safely insert and remove elements from a queue without causing data corruption or race conditions. Thread safety is important when multiple threads need to access shared data structures to avoid issues like data corruption or resource contention.
In C++, you can implement a thread-safe queue using several synchronization techniques. One of the most common ways is to use a combination of mutexes (short for "mutual exclusion") and condition variables.
Program:
Here's a code shown below demonstrates the usage of thread safe queue in C++:
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class ThreadSafeQueue {
public:
// Constructor
ThreadSafeQueue() {}
// Push an element onto the queue
void Push(const T& item) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(item);
condition_.notify_one(); // Notify waiting threads that an item is available.
}
// Pop an element from the queue
T Pop() {
std::unique_lock<std::mutex> lock(mutex_);
// Wait until the queue is not empty
condition_.wait(lock, [this] { return !queue_.empty(); });
T item = queue_.front();
queue_.pop();
return item;
}
// Check if the queue is empty
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable condition_;
};
int main() {
ThreadSafeQueue<int> queue;
// Create a producer thread
std::thread producer([&]() {
for (int i = 0; i < 10; ++i) {
queue.Push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
});
// Create a consumer thread
std::thread consumer([&]() {
for (int i = 0; i < 10; ++i) {
int item = queue.Pop();
std::cout << "Popped: " << item << std::endl;
}
});
producer.join();
consumer.join();
return 0;
}
Output:
Popped: 0
Popped: 1
Popped: 2
Popped: 3
Popped: 4
Popped: 5
Popped: 6
Popped: 7
Popped: 8
Popped: 9
Explanation:
1. Header and Library Includes:
In this example, the code includes the necessary C++ standard library headers for using threads (<thread>), queues (<queue>), mutexes (<mutex>), and condition variables (<condition_variable>).
2. ThreadSafeQueue Class Definition:
After that, the code defines a template class called ThreadSafeQueue. It's a thread-safe queue that can hold elements of type T.
3. Push Method:
- The Push method allows you to add an element of type T to the queue in a thread-safe manner.
- It locks a mutex (mutex_) using std::lock_guard to ensure exclusive access to the queue while pushing an item.
- After that, it pushes the item onto the queue using the std::queue data structure.
- After pushing the item, it notifies one waiting thread (if any) that an item is available using a condition variable (condition_).
4. Pop Method:
- The Pop method retrieves and removes an element from the queue in a thread-safe manner.
- It locks a mutex using std::unique_lock to ensure exclusive access to the queue.
- After that, it is waits (blocks) using a condition variable until the queue is not empty. The lambda function inside condition_.wait, checks whether the queue is empty if it's not, the function proceeds.
- Once the condition is satisfied (i.e., the queue is not empty), it retrieves the front element, pops it from the queue, and returns the item.
5. Empty Method:
- The Empty method checks whether the queue is empty in a thread-safe manner.
- It locks a mutex to ensure exclusive access to the queue and then checks if the queue is empty.
6. main Function:
In the main Function, we create an instance of the ThreadSafeQueue class, specialized for int elements.
7. Producer Thread:
The producer thread is created and is responsible for pushing integers from 0 to 9 onto the thread-safe queue. It simulates some delay between pushes using std::this_thread::sleep_for.
8. Consumer Thread:
The consumer thread is created and is responsible for popping items from the thread-safe queue and printing them to the console.
9. Join Threads:
The main Function waits for both the producer and consumer threads to finish their work before exiting.
10. Return Statement:
The main Function returns 0 to indicate a successful execution.
Complexity analysis:
Push Method:
Time Complexity: O(1)
Locking the mutex, pushing an item onto the queue, and notifying a waiting thread are typically very fast operations that do not depend on the size of the queue.
Space Complexity: O(1)
The space used by the std::lock_guard, std::mutex, std::condition_variable, and the queue itself are constant.
Pop Method:
Time Complexity: O(1) on average
Locking the mutex, checking if the queue is empty, and popping an item are generally very fast operations. However, the wait operation may block the thread, but it is typically considered a constant factor.
Space Complexity: O(1)
Similar to the Push method, the space complexity is constant.
Empty Method:
Time Complexity: O(1)
Locking the mutex and checking if the queue is empty are both constant time operations.
Space Complexity: O(1)
The space complexity is constant.
Approach 2: Mutex-based Synchronization:
Program:
Here's a code shown below demonstrates the usage of mutex-based synchronization in C++:
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
template <typename T>
class MutexThreadSafeQueue {
public:
// Constructor
MutexThreadSafeQueue() {}
// Push an element onto the queue
void Push(const T& item) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(item);
}
// Pop an element from the queue
T Pop() {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) {
throw std::runtime_error("Queue is empty");
}
T item = queue_.front();
queue_.pop();
return item;
}
// Check if the queue is empty
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
private:
std::queue<T> queue_;
std::mutex mutex_;
};
int main() {
MutexThreadSafeQueue<int> myQueue;
// Push elements onto the queue
myQueue.Push(1);
myQueue.Push(2);
myQueue.Push(3);
// Pop elements from the queue
while (!myQueue.Empty()) {
std::cout << "Popped: " << myQueue.Pop() << std::endl;
}
return 0;
}
Output:
Popped: 1
Popped: 2
Popped: 3
Explanation:
1. Include Header Files:
In this example, the code begins by including the necessary C++ standard library headers. These headers are used for input and output (<iostream>), working with threads (<thread>), using queues (<queue>), and implementing thread synchronization with mutexes (<mutex>).
2. Define the MutexThreadSafeQueue Class:
template <typename T>
class MutexThreadSafeQueue {
public:
// Constructor
MutexThreadSafeQueue() {}
// Push an element onto the queue
void Push(const T& item) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(item);
}
// Pop an element from the queue
T Pop() {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) {
throw std::runtime_error("Queue is empty");
}
T item = queue_.front();
queue_.pop();
return item;
}
// Check if the queue is empty
bool Empty() {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
private:
std::queue<T> queue_;
std::mutex mutex_;
};
This code defines a C++ class template MutexThreadSafeQueue which represents a thread-safe queue. Here's what the class does:
- The constructor initializes the queue.
- Push function adds an element to the queue while protecting the operation with a mutex.
- Pop function retrieves an element from the queue, also protected by a mutex. It throws an exception if the queue is empty.
- Empty checks if the queue is empty, again within a mutex-protected context.
- It uses std::queue to store elements and std::mutex for synchronization.
3. Define the Main Function:
int main() {
MutexThreadSafeQueue<int> myQueue;
// Push elements onto the queue
myQueue.Push(1);
myQueue.Push(2);
myQueue.Push(3);
// Pop elements from the queue
while (!myQueue.Empty()) {
std::cout << "Popped: " << myQueue.Pop() << std::endl;
}
return 0;
}
In the Main Function:
- An instance of the MutexThreadSafeQueue class is created, specifying that it will store integers (int).
- Three elements (1, 2, and 3) are pushed onto the queue using the Push method.
- A while loop is used to pop and print elements from the queue using the Pop method, and the loop continues until the queue is empty.
- The program ends by returning 0 from the main Function.
Complexity analysis:
Push Operation:
Time Complexity: O(1)
The Push operation has a constant time complexity because it simply adds an element to the end of the underlying std::queue, and the std::queue class has constant-time insertion at the end.
Locking and unlocking the mutex have very low overhead and don't significantly impact the time complexity.
Space Complexity: O(1)
The Push operation doesn't allocate additional memory based on the number of elements in the queue. It only adds a reference to the existing element, so the space complexity remains constant.
Pop Operation:
Time Complexity: O(1)
Similar to the Push operation, the Pop operation has a constant time complexity. It removes an element from the front of the underlying std::queue, which is a constant-time operation.
Locking and unlocking the mutex have low overhead and don't significantly affect the time complexity.
Space Complexity: O(1)
The Pop operation also doesn't allocate additional memory based on the number of elements in the queue, so the space complexity is constant.
Empty Operation:
Time Complexity: O(1)
The Empty operation is a constant-time operation. It checks whether the underlying std::queue is empty, which is a constant-time operation.
Locking and unlocking the mutex have low overhead and don't significantly affect the time complexity.
Space Complexity: O(1)
The Empty operation does not allocate additional memory; it simply checks the state of the queue, so the space complexity is constant.