// TODO: known bugs: // - produce and push calls in producent thread must be atomic, otherwise we can overproduce tasks #include #include #include #include #include #include #include #include #include #include #include using lock_guard = std::lock_guard; constexpr unsigned MIN_PRODUCE_TIME = 1; constexpr unsigned MAX_PRODUCE_TIME = 3; constexpr unsigned MIN_CONSUME_TIME = 1; constexpr unsigned MAX_CONSUME_TIME = 3; constexpr unsigned NUM_PRODUCENTS = 20; constexpr unsigned NUM_CONSUMENTS = 20; constexpr unsigned TASK_AMOUNT = 200000; char prod_cnt = 'A'; int cons_cnt = 0; std::atomic_uint32_t task_cnt; std::atomic_uint32_t active_producers; std::atomic_uint32_t active_consuments; std::mutex os_l; std::ostream& os = std::cout; std::mutex p_l; std::condition_variable can_pop; std::condition_variable can_push; std::condition_variable tasks_done; class Task { private: std::chrono::milliseconds dificulty; unsigned int id; bool empty = false; public: Task() = delete; Task(const Task&) = delete; Task(Task&& t) : dificulty(t.dificulty), id(t.id) {} Task(std::chrono::milliseconds s) : dificulty(s), id(++task_cnt) { if (id > TASK_AMOUNT) empty = true; // os_l.lock(); // print(); // os_l.unlock(); } bool const is_empty(){ return empty; } void process() { std::this_thread::sleep_for(dificulty); } unsigned int const get_id(){ return id; } void print(){ // os_l.lock(); os << "Task <" << id << "> " << "is empty: " << empty << std::endl; // os_l.unlock(); } }; class Queue{ std::queue> q; std::mutex q_l; const size_t MAX_LENGHT = 100'000; public: std::atomic_bool terminating; Queue(): terminating(false) {} void push(std::optional&& t){ std::unique_lock pushlock(q_l); can_push.wait(pushlock, [this]{ return (q.size() < MAX_LENGHT); }); q.push(std::move(t)); pushlock.unlock(); can_pop.notify_one(); } std::optional pop(){ std::unique_lock poplock(q_l); can_pop.wait(poplock, [this]{ return !is_empty() || terminating; }); if(terminating && is_empty()) return {}; std::optional t(std::move(q.front())); q.pop(); poplock.unlock(); can_push.notify_one(); return t; } bool is_empty(){ return q.empty(); } void terminating_sequence(){ terminating = 1; while( ! q.empty() ){ can_pop.notify_all(); std::this_thread::sleep_for(std::chrono::milliseconds(300)); } // os_l.lock(); // os << "Termination sequence finished." << std::endl; // os_l.unlock(); } }; class Producent { private: public: const char id; Producent() : id(prod_cnt++) { active_producers++; // std::lock_guard lg(os_l); // os << "Producent spawned <" << id << ">" << std::endl; } Producent(const Producent&) = delete; ~Producent(){ active_producers--; } std::optional produce() { std::srand(std::time({})); std::this_thread::sleep_for(std::chrono::milliseconds( (std::rand() % (MAX_PRODUCE_TIME - MIN_PRODUCE_TIME)) + MIN_PRODUCE_TIME)); Task t = Task(std::chrono::milliseconds( (std::rand() % (MAX_CONSUME_TIME - MIN_CONSUME_TIME)) + MIN_CONSUME_TIME)); // os_l.lock(); // os << "Task <" << t.get_id() << "> produced by <" << id << ">" << std::endl; // os_l.unlock(); if(t.get_id() > TASK_AMOUNT) return {}; std::optional T = std::move(t); return T; } }; class Consument { private: const int id; public: Consument() : id(cons_cnt++) { active_consuments++; // lock_guard lg(os_l); // os << "Consument spawned <" << id << ">" << std::endl; } Consument(const Consument&) = delete; ~Consument() { active_consuments--; } void consume(Task&& t) { t.process(); // os_l.lock(); // os << "Task <" << t.get_id() << "> processed by <" << id << ">" << std::endl; // os_l.unlock(); } void process(Queue* task_queue) { while(1){ std::optional t = task_queue->pop(); if( t.has_value() ){ consume(std::move(t.value())); continue; } if( task_queue->is_empty() ) { // os_l.lock(); // os << "Consument <" << id << "> died" << std::endl; // os_l.unlock(); return; } } } }; //############################################################################################ void consument_thread(Queue* tasks){ Consument c; c.process(tasks); } void producer_thread(Queue* tasks){ { Producent p; bool produce = true; while(produce){ std::optional t = p.produce(); produce = t.has_value(); tasks->push(std::move(t)); } // os_l.lock(); // os << "Producent <" << p.id << "> died" << std::endl; // os_l.unlock(); } //here dies the producent if(active_producers == 0){ // os_l.lock(); // os << "Activating termination sequence." << std::endl; // os_l.unlock(); tasks->terminating_sequence(); } } int main(){ task_cnt = 0; Queue q; std::vector threads; std::mutex m_l; for(unsigned int i = 0; i < NUM_PRODUCENTS; i++){ threads.emplace_back(producer_thread, &q); } for(unsigned int i = 0; i < NUM_CONSUMENTS; i++){ threads.emplace_back(consument_thread, &q); } for(auto& t : threads){ t.join(); os_l.lock(); os << "joined" << std::endl; os_l.unlock(); } ////////////////////////////////////////////////////// // //Task t1(std::chrono::milliseconds(500)); // Queue t_q; // Task t_0(std::chrono::seconds(1)); // Task t1(std::chrono::seconds(1)); // Task t2 = t_q.pop(); // Task t3 = t_q.pop(); return 0; }