Scarab  v3.8.0
Project 8 C++ Utility Library
concurrent_queue.hh
Go to the documentation of this file.
1 /*
2  * concurrent_queue.hh
3  *
4  * Created on: July 8, 2015
5  * Author: nsoblath
6  *
7  * FIFO Queue
8  *
9  * Based almost exactly on the class concurrent_queue from:
10  * http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
11  * Post author: Anthony Williams
12  * Copyright 2005-2013 Just Software Solutions Ltd. All rights reserved.
13  */
14 
15 #ifndef SCARAB_CONCURRENT_QUEUE_HH_
16 #define SCARAB_CONCURRENT_QUEUE_HH_
17 
18 #include "logger.hh"
19 
20 #include <chrono>
21 #include <condition_variable>
22 #include <deque>
23 #include <mutex>
24 
25 namespace scarab
26 {
27  LOGGER( slog_cq, "concurrent_queue" );
28 
29  template< class XDataType >
31  {
32  public:
33  typedef std::deque< XDataType > queue;
34 
36  {
37  queue& f_queue;
38  queue_not_empty( queue& a_queue ) :
39  f_queue( a_queue )
40  {}
41  bool operator()() const
42  {
43  return ! f_queue.empty();
44  }
45  };
46 
47  typedef std::unique_lock< std::mutex > cq_lock;
48 
49  public:
51  f_queue(),
52  f_interrupt( false ),
53  f_timeout( std::chrono::milliseconds( 1000 ) ),
54  f_mutex(),
56  {
57  }
58 
60  {
61  f_queue.clear();
62  }
63 
64  private:
65  queue f_queue;
67 
68  std::chrono::milliseconds f_timeout;
69 
70  mutable std::mutex f_mutex;
71  std::condition_variable f_condition_var;
72 
73  public:
74  void push( XDataType const& a_data )
75  {
76  LDEBUG( slog_cq, "Attempting to push to queue" );
77  cq_lock lock( f_mutex );
78  LDEBUG( slog_cq, "Pushing to concurrent queue; size: " << f_queue.size() );
79  f_queue.push_back( a_data );
80  lock.unlock();
81  f_condition_var.notify_one();
82  return;
83  }
84 
85  bool empty() const
86  {
87  cq_lock lock( f_mutex );
88  return f_queue.empty();
89  }
90 
91  unsigned size() const
92  {
93  cq_lock lock( f_mutex );
94  return f_queue.size();
95  }
96 
97  bool try_pop( XDataType& a_popped_value )
98  {
99  cq_lock lock( f_mutex );
100  f_interrupt = false;
101  if( f_queue.empty() )
102  {
103  return false;
104  }
105 
106  a_popped_value = f_queue.front();
107  f_queue.pop_front();
108  return true;
109  }
110 
111  bool wait_and_pop( XDataType& a_popped_value )
112  {
113  cq_lock lock( f_mutex );
114  f_interrupt = false;
115  f_condition_var.wait( lock, queue_not_empty( f_queue ) );
116  if( f_interrupt )
117  {
118  f_interrupt = false;
119  return false;
120  }
121 
122  a_popped_value = f_queue.front();
123  f_queue.pop_front();
124  LDEBUG( slog_cq, "Popping from concurrent queue; size: " << f_queue.size() );
125  return true;
126  }
127 
128  bool timed_wait_and_pop( XDataType& a_popped_value )
129  {
130  cq_lock lock( f_mutex );
131  f_interrupt = false;
132  std::chrono::steady_clock::time_point const waitUntil = std::chrono::steady_clock::now() + f_timeout;
133  if( ! f_condition_var.wait_until( lock, waitUntil, queue_not_empty( f_queue ) ) )
134  {
135  //LDEBUG( mtlog_cq, "Queue wait has timed out" );
136  return false;
137  }
138  if( f_interrupt )
139  {
140  f_interrupt = false;
141  return false;
142  }
143 
144  a_popped_value = f_queue.front();
145  f_queue.pop_front();
146  LDEBUG( slog_cq, "Popping from concurrent queue; size: " << f_queue.size() );
147  return true;
148  }
149 
150  void interrupt()
151  {
152  f_interrupt = true;
153  f_condition_var.notify_one();
154  return;
155  }
156 
157  inline unsigned get_timeout() const
158  {
159  return f_timeout.count();
160  }
161 
162  inline void set_timeout( unsigned a_duration )
163  {
164  f_timeout = std::chrono::milliseconds( a_duration );
165  return;
166  }
167  };
168 
169 } /* namespace scarab */
170 
171 #endif /* SCARAB_CONCURRENT_QUEUE_HH_ */
unsigned get_timeout() const
STL namespace.
LOGGER(mtlog, "authentication")
Contains the logger class and macros, based on Kasper&#39;s KLogger class.
void set_timeout(unsigned a_duration)
bool timed_wait_and_pop(XDataType &a_popped_value)
bool wait_and_pop(XDataType &a_popped_value)
#define LDEBUG(...)
Definition: logger.hh:389
bool try_pop(XDataType &a_popped_value)
std::condition_variable f_condition_var
std::deque< XDataType > queue
void push(XDataType const &a_data)
std::chrono::milliseconds f_timeout
std::mutex f_mutex
Timeout duration in milliseconds.
std::unique_lock< std::mutex > cq_lock