Sunday, October 12, 2014

Implement a channel in cpp

#ifndef __CHANNEL_H__
#define __CHANNEL_H__

#include <list>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <system_error>
#include "packet.h"

using std::cout;
using std::endl;

//#define DEBUG 1

template<class item>
class channel {
public:
    channel(int queue_max = -1) : closed(false), queue_max(queue_max) { }
    ~channel() {
        cout << "delete channel" << endl;
    }

    int size() {
        return queue.size();
    }

    void close() {
        std::unique_lock<std::mutex> lock_q(m_q);
        closed = true;
        cv_qne.notify_all();
        cv_qnf.notify_all();
    }

    bool is_closed() {
        std::unique_lock<std::mutex> lock_q(m_q);
        return closed;
    }

    bool put(item* &in, bool wait = true) {
#ifdef DEBUG
        cout << "before checking queue size in put" << endl;
#endif
        if(queue_max > 0 && queue.size() >= queue_max) {
            if(wait) {
                //std::unique_lock<std::mutex> lock_wait_for_queue_not_full(m_qnf, std::defer_lock);
                std::unique_lock<std::mutex> lock_wait_for_queue_not_full(m_qnf);
                cv_qnf.wait(lock_wait_for_queue_not_full, [this]() -> bool {return closed || queue.size() < queue_max;});
            }
            else {
                return false;
            }
        }
#ifdef DEBUG
        cout << "after checking queue size in put" << endl;
#endif

        if(closed) {
            throw std::logic_error("put to closed channel");
        }

#ifdef DEBUG
        cout << "before locking queue in put" << endl;
#endif
        std::unique_lock<std::mutex> lock_q(m_q);
        queue.push_back(in);
        lock_q.unlock();
#ifdef DEBUG
        cout << "after locking queue in put" << endl;
#endif

#ifdef DEBUG
        cout << "before notifying get in put" << endl;
#endif
        cv_qne.notify_one();
        //cv_qne.notify_all();
#ifdef DEBUG
        cout << "after notifying get in put" << endl;
#endif

        return true;
    }

    bool get(item* &out, bool wait = true) {
#ifdef DEBUG
        cout << "before checking queue size in get" << endl;
#endif
        if(wait) {
            //std::unique_lock<std::mutex> lock_wait_for_queue_not_empty(m_qne, std::defer_lock);
            std::unique_lock<std::mutex> lock_wait_for_queue_not_empty(m_qne);
            cv_qne.wait(lock_wait_for_queue_not_empty, [this]() -> bool {return closed || !queue.empty();});
        }
        else {
            if(queue.empty()) {
                return false;
            }
        }
#ifdef DEBUG
        cout << "after checking queue size in get" << endl;
#endif

        if(closed) {
            throw std::logic_error("channel is closed");
        }

#ifdef DEBUG
        cout << "before locking queue in put" << endl;
#endif
        std::unique_lock<std::mutex> lock_q(m_q);
        out = queue.front();
        queue.pop_front();
        lock_q.unlock();
#ifdef DEBUG
        cout << "after locking queue in put" << endl;
#endif

        cv_qnf.notify_one();

        return true;
    }
   
private:
    std::list<item* > queue;
    std::mutex m_qne;
    std::condition_variable cv_qne;
    std::mutex m_qnf;
    std::condition_variable cv_qnf;
    std::mutex m_q;
    bool closed;
    int queue_max;

};

typedef channel<packet > packet_channel;

#endif

No comments:

Post a Comment