26template <
typename T,
typename Queue>
32 template <
typename...
Args>
34 auto raw_ptr =
new Queue(std::forward<Args>(args)...);
41 const int id_to_return =
queues_.size();
42 queues_.push_back(std::move(queue));
46 void Push(
const int idx, T&& t) {
48 queues_[idx]->Push(std::move(t));
54 int q_id = selector();
57 CHECK(queue) <<
"queue must not be null.";
62 auto default_selector = [
this]() ->
int {
63 for (
int i = 0; i <
queues_.size(); i++) {
70 return Pop(default_selector);
79 CHECK(idx >= 0 && idx <
queues_.size()) <<
"queues_ array out of bound";
Definition: multiplexer.h:27
std::function< int(void)> QueueSelector
Definition: multiplexer.h:30
std::vector< QueuePtr > queues_
Definition: multiplexer.h:83
static QueuePtr CreateQueue(Args &&... args)
Definition: multiplexer.h:33
Semaphore sem_items_
Definition: multiplexer.h:82
void CheckIdx(const int idx)
Definition: multiplexer.h:78
QueuePtr null_ptr_
Definition: multiplexer.h:84
T Pop()
Definition: multiplexer.h:61
Multiplexer()
Definition: multiplexer.h:38
T Pop(QueueSelector selector)
Definition: multiplexer.h:52
int RegisterQueue(QueuePtr &&queue)
Definition: multiplexer.h:40
void Push(const int idx, T &&t)
Definition: multiplexer.h:46
bool IsEmpty(const int idx)
Definition: multiplexer.h:73
std::unique_ptr< Queue > QueuePtr
Definition: multiplexer.h:29
void SemWait()
Definition: multiplexer.h:75
Definition: semaphore.h:23
void SemPost()
Definition: semaphore.h:35
void SemWait()
Definition: semaphore.h:28
#define CHECK(x)
Definition: logging.h:251
Definition: alloc_utils.cpp:23
std::vector< std::string_view > Args
Definition: incremental.h:28