// TODO: known bugs: // - produce and push calls in producent thread must be atomic, otherwise we can overproduce tasks #include #include #include #include #include #include #include #include #include #include #include using lock_guard = std::lock_guard; constexpr unsigned MIN_CONSUME_TIME = 3; constexpr unsigned MIN_PRODUCE_TIME = 2; constexpr unsigned MAX_CONSUME_TIME = 10; constexpr unsigned MAX_PRODUCE_TIME = 3; constexpr unsigned NUM_PRODUCENTS = 1; constexpr unsigned NUM_CONSUMENTS = 3; constexpr unsigned TASK_AMOUNT = 5; char prod_cnt = 'A'; int cons_cnt = 0; std::atomic_uint32_t task_cnt; std::mutex os_l; std::ostream& os = std::cout; std::condition_variable queue_not_empty; std::condition_variable queue_not_full; std::condition_variable tasks_done; std::atomic_bool tasks_generated; class Task { private: std::chrono::seconds dificulty; int id; public: Task() = delete; Task(const Task&) = delete; Task(Task&& t) : dificulty(t.dificulty), id(t.id) {} Task(std::chrono::seconds s) : dificulty(s), id(task_cnt++) {} void process() { std::this_thread::sleep_for(dificulty); } int const get_id(){ return id; } }; class Queue{ std::queue q; std::mutex q_l; const size_t MAX_LENGHT = 100'000; public: std::atomic_bool finished; Queue(): finished(false) {} void push(Task&& t){ std::unique_lock pushlock(q_l); queue_not_full.wait(pushlock, [this]{ return (q.size() < MAX_LENGHT); }); q.push(std::move(t)); if(tasks_generated) finished = 1; pushlock.unlock(); queue_not_empty.notify_one(); } std::optional pop(){ std::unique_lock poplock(q_l); if(finished) return {}; queue_not_empty.wait(poplock, [this]{ return !is_empty(); }); std::optional t(std::move(q.front())); q.pop(); poplock.unlock(); queue_not_full.notify_one(); return t; } bool is_empty(){ return q.empty(); } }; class Producent { private: public: const char id; Producent() : id(prod_cnt++) { std::lock_guard lg(os_l); os << "Producent spawned <" << id << ">" << std::endl; } Producent(const Producent&) = delete; ~Producent() = default; Task produce() { std::srand(std::time({})); std::this_thread::sleep_for(std::chrono::seconds((std::rand() % (MAX_PRODUCE_TIME - MIN_PRODUCE_TIME)) + MIN_PRODUCE_TIME)); Task t = Task(std::chrono::seconds((std::rand() % (MAX_CONSUME_TIME - MIN_CONSUME_TIME)) + MIN_CONSUME_TIME)); if(t.get_id() == TASK_AMOUNT) tasks_generated = 1; os_l.lock(); os << "Task <" << t.get_id() << "> produced by <" << id << ">" << std::endl; os_l.unlock(); return t; } }; class Consument { private: const int id; public: Consument() : id(cons_cnt++) { lock_guard lg(os_l); os << "Consument spawned <" << id << ">" << std::endl; } Consument(const Consument&) = delete; ~Consument() = default; void consume(Task&& t) { t.process(); os_l.lock(); os << "Task <" << t.get_id() << "> processed by <" << id << ">" << std::endl; os_l.unlock(); } void process(Queue* task_queue) { while (1) { auto t = task_queue->pop(); if( ! t.has_value()){ os_l.lock(); os << "Consument <" << id << "> died" << std::endl; os_l.unlock(); return; } consume(std::move(t.value())); } } }; //############################################################# void consument_thread(Queue* tasks){ Consument c; c.process(tasks); } void producer_thread(Queue* tasks){ Producent p; while(!tasks_generated){ tasks->push(std::move(p.produce())); } os_l.lock(); os << "Producent <" << p.id << "> died" << std::endl; os_l.unlock(); } int main(){ task_cnt = 0; tasks_generated = false; Queue q; std::vector threads; std::mutex m_l; for(unsigned int i = 0; i < NUM_PRODUCENTS; i++){ threads.emplace_back(producer_thread, &q); } for(unsigned int i = 0; i < NUM_CONSUMENTS; i++){ threads.emplace_back(consument_thread, &q); } for(auto& t : threads){ t.join(); os_l.lock(); os << "joined" << std::endl; os_l.unlock(); } return 0; }