25#ifndef __mqtt_thread_queue_h
26#define __mqtt_thread_queue_h
29#include <condition_variable>
84template <
typename T,
class Container = std::deque<T>>
100 mutable std::mutex lock_;
102 std::condition_variable notEmptyCond_;
104 std::condition_variable notFullCond_;
111 std::queue<T, Container> que_;
114 using guard = std::lock_guard<std::mutex>;
116 using unique_guard = std::unique_lock<std::mutex>;
119 bool is_done()
const {
120 return closed_ && que_.empty();
180 notFullCond_.notify_all();
181 notEmptyCond_.notify_all();
210 while (!que_.empty())
212 notFullCond_.notify_all();
221 unique_guard g{lock_};
222 notFullCond_.wait(g, [
this] {
return que_.size() < cap_ || closed_; });
225 que_.emplace(std::move(val));
226 notEmptyCond_.notify_one();
236 if (que_.size() >= cap_ || closed_)
239 que_.emplace(std::move(val));
240 notEmptyCond_.notify_one();
252 template <
typename Rep,
class Period>
254 unique_guard g{lock_};
255 bool to = !notFullCond_.wait_for(
257 [
this] {
return que_.size() < cap_ || closed_; }
262 que_.emplace(std::move(val));
263 notEmptyCond_.notify_one();
276 template <
class Clock,
class Duration>
278 value_type val,
const std::chrono::time_point<Clock, Duration>& absTime
280 unique_guard g{lock_};
281 bool to = !notFullCond_.wait_until(
283 [
this] {
return que_.size() < cap_ || closed_; }
289 que_.emplace(std::move(val));
290 notEmptyCond_.notify_one();
303 unique_guard g{lock_};
304 notEmptyCond_.wait(g, [
this] {
return !que_.empty() || closed_; });
308 *val = std::move(que_.front());
310 notFullCond_.notify_one();
320 unique_guard g{lock_};
321 notEmptyCond_.wait(g, [
this] {
return !que_.empty() || closed_; });
327 notFullCond_.notify_one();
346 *val = std::move(que_.front());
348 notFullCond_.notify_one();
361 template <
typename Rep,
class Period>
366 unique_guard g{lock_};
367 notEmptyCond_.wait_for(
369 [
this] {
return !que_.empty() || closed_; }
375 *val = std::move(que_.front());
377 notFullCond_.notify_one();
390 template <
class Clock,
class Duration>
392 value_type* val,
const std::chrono::time_point<Clock, Duration>& absTime
397 unique_guard g{lock_};
398 notEmptyCond_.wait_until(
399 g, absTime, [
this] {
return !que_.empty() || closed_; }
404 *val = std::move(que_.front());
406 notFullCond_.notify_one();
Definition thread_queue.h:43
queue_closed()
Definition thread_queue.h:45
Definition thread_queue.h:86
typename Container::size_type size_type
Definition thread_queue.h:93
T value_type
Definition thread_queue.h:91
size_type size() const
Definition thread_queue.h:167
void capacity(size_type cap)
Definition thread_queue.h:159
bool done() const
Definition thread_queue.h:200
Container container_type
Definition thread_queue.h:89
bool try_put(value_type val)
Definition thread_queue.h:234
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:362
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:253
static constexpr size_type MAX_CAPACITY
Definition thread_queue.h:96
bool try_get(value_type *val)
Definition thread_queue.h:338
thread_queue()
Definition thread_queue.h:128
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:277
void clear()
Definition thread_queue.h:208
bool closed() const
Definition thread_queue.h:190
bool get(value_type *val)
Definition thread_queue.h:299
bool empty() const
Definition thread_queue.h:141
void close()
Definition thread_queue.h:177
size_type capacity() const
Definition thread_queue.h:149
thread_queue(size_t cap)
Definition thread_queue.h:135
void put(value_type val)
Definition thread_queue.h:220
value_type get()
Definition thread_queue.h:319
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:391
Definition async_client.h:60