#ifndef __CHANNEL_H__
#define __CHANNEL_H__
#include <list>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <system_error>
#include "packet.h"
using std::cout;
using std::endl;
//#define DEBUG 1
template<class item>
class channel {
public:
channel(int queue_max = -1) : closed(false), queue_max(queue_max) { }
~channel() {
cout << "delete channel" << endl;
}
int size() {
return queue.size();
}
void close() {
std::unique_lock<std::mutex> lock_q(m_q);
closed = true;
cv_qne.notify_all();
cv_qnf.notify_all();
}
bool is_closed() {
std::unique_lock<std::mutex> lock_q(m_q);
return closed;
}
bool put(item* &in, bool wait = true) {
#ifdef DEBUG
cout << "before checking queue size in put" << endl;
#endif
if(queue_max > 0 && queue.size() >= queue_max) {
if(wait) {
//std::unique_lock<std::mutex> lock_wait_for_queue_not_full(m_qnf, std::defer_lock);
std::unique_lock<std::mutex> lock_wait_for_queue_not_full(m_qnf);
cv_qnf.wait(lock_wait_for_queue_not_full, [this]() -> bool {return closed || queue.size() < queue_max;});
}
else {
return false;
}
}
#ifdef DEBUG
cout << "after checking queue size in put" << endl;
#endif
if(closed) {
throw std::logic_error("put to closed channel");
}
#ifdef DEBUG
cout << "before locking queue in put" << endl;
#endif
std::unique_lock<std::mutex> lock_q(m_q);
queue.push_back(in);
lock_q.unlock();
#ifdef DEBUG
cout << "after locking queue in put" << endl;
#endif
#ifdef DEBUG
cout << "before notifying get in put" << endl;
#endif
cv_qne.notify_one();
//cv_qne.notify_all();
#ifdef DEBUG
cout << "after notifying get in put" << endl;
#endif
return true;
}
bool get(item* &out, bool wait = true) {
#ifdef DEBUG
cout << "before checking queue size in get" << endl;
#endif
if(wait) {
//std::unique_lock<std::mutex> lock_wait_for_queue_not_empty(m_qne, std::defer_lock);
std::unique_lock<std::mutex> lock_wait_for_queue_not_empty(m_qne);
cv_qne.wait(lock_wait_for_queue_not_empty, [this]() -> bool {return closed || !queue.empty();});
}
else {
if(queue.empty()) {
return false;
}
}
#ifdef DEBUG
cout << "after checking queue size in get" << endl;
#endif
if(closed) {
throw std::logic_error("channel is closed");
}
#ifdef DEBUG
cout << "before locking queue in put" << endl;
#endif
std::unique_lock<std::mutex> lock_q(m_q);
out = queue.front();
queue.pop_front();
lock_q.unlock();
#ifdef DEBUG
cout << "after locking queue in put" << endl;
#endif
cv_qnf.notify_one();
return true;
}
private:
std::list<item* > queue;
std::mutex m_qne;
std::condition_variable cv_qne;
std::mutex m_qnf;
std::condition_variable cv_qnf;
std::mutex m_q;
bool closed;
int queue_max;
};
typedef channel<packet > packet_channel;
#endif
No comments:
Post a Comment