diff --git a/prod_con.cpp b/prod_con.cpp index 81c42e2..edd1ca1 100644 --- a/prod_con.cpp +++ b/prod_con.cpp @@ -12,73 +12,102 @@ #include #include + using lock_guard = std::lock_guard; -constexpr unsigned MIN_CONSUME_TIME = 3; -constexpr unsigned MIN_PRODUCE_TIME = 2; -constexpr unsigned MAX_CONSUME_TIME = 10; +constexpr unsigned MIN_PRODUCE_TIME = 1; constexpr unsigned MAX_PRODUCE_TIME = 3; -constexpr unsigned NUM_PRODUCENTS = 1; -constexpr unsigned NUM_CONSUMENTS = 3; -constexpr unsigned TASK_AMOUNT = 5; +constexpr unsigned MIN_CONSUME_TIME = 1; +constexpr unsigned MAX_CONSUME_TIME = 3; +constexpr unsigned NUM_PRODUCENTS = 20; +constexpr unsigned NUM_CONSUMENTS = 20; +constexpr unsigned TASK_AMOUNT = 200000; char prod_cnt = 'A'; int cons_cnt = 0; std::atomic_uint32_t task_cnt; +std::atomic_uint32_t active_producers; +std::atomic_uint32_t active_consuments; std::mutex os_l; std::ostream& os = std::cout; -std::condition_variable queue_not_empty; -std::condition_variable queue_not_full; +std::mutex p_l; +std::condition_variable can_pop; +std::condition_variable can_push; std::condition_variable tasks_done; -std::atomic_bool tasks_generated; class Task { private: - std::chrono::seconds dificulty; - int id; + std::chrono::milliseconds dificulty; + unsigned int id; + bool empty = false; public: Task() = delete; Task(const Task&) = delete; Task(Task&& t) : dificulty(t.dificulty), id(t.id) {} - Task(std::chrono::seconds s) : dificulty(s), id(task_cnt++) {} + Task(std::chrono::milliseconds s) : dificulty(s), id(++task_cnt) { + if (id > TASK_AMOUNT) empty = true; + // os_l.lock(); + // print(); + // os_l.unlock(); + } + bool const is_empty(){ return empty; } void process() { std::this_thread::sleep_for(dificulty); } - int const get_id(){ return id; } + unsigned int const get_id(){ return id; } + void print(){ + // os_l.lock(); + os << "Task <" << id << "> " << "is empty: " << empty << std::endl; + // os_l.unlock(); + } }; class Queue{ - std::queue q; + std::queue> q; std::mutex q_l; const size_t MAX_LENGHT = 100'000; -public: - std::atomic_bool finished; - Queue(): finished(false) {} - void push(Task&& t){ +public: + std::atomic_bool terminating; + Queue(): terminating(false) {} + + void push(std::optional&& t){ std::unique_lock pushlock(q_l); - queue_not_full.wait(pushlock, [this]{ return (q.size() < MAX_LENGHT); }); + can_push.wait(pushlock, [this]{ return (q.size() < MAX_LENGHT); }); + q.push(std::move(t)); - if(tasks_generated) finished = 1; pushlock.unlock(); - queue_not_empty.notify_one(); + can_pop.notify_one(); } std::optional pop(){ std::unique_lock poplock(q_l); - if(finished) return {}; - queue_not_empty.wait(poplock, [this]{ return !is_empty(); }); - + can_pop.wait(poplock, [this]{ return !is_empty() || terminating; }); + + if(terminating && is_empty()) return {}; std::optional t(std::move(q.front())); q.pop(); poplock.unlock(); - queue_not_full.notify_one(); + can_push.notify_one(); return t; } bool is_empty(){ return q.empty(); } + + void terminating_sequence(){ + terminating = 1; + + while( ! q.empty() ){ + can_pop.notify_all(); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + } + + // os_l.lock(); + // os << "Termination sequence finished." << std::endl; + // os_l.unlock(); + } }; class Producent { @@ -86,21 +115,26 @@ private: public: const char id; Producent() : id(prod_cnt++) { - std::lock_guard lg(os_l); - os << "Producent spawned <" << id << ">" << std::endl; + active_producers++; + // std::lock_guard lg(os_l); + // os << "Producent spawned <" << id << ">" << std::endl; } Producent(const Producent&) = delete; - ~Producent() = default; - Task produce() { + ~Producent(){ active_producers--; } + std::optional produce() { std::srand(std::time({})); - std::this_thread::sleep_for(std::chrono::seconds((std::rand() % (MAX_PRODUCE_TIME - MIN_PRODUCE_TIME)) + MIN_PRODUCE_TIME)); - Task t = Task(std::chrono::seconds((std::rand() % (MAX_CONSUME_TIME - MIN_CONSUME_TIME)) + MIN_CONSUME_TIME)); - if(t.get_id() == TASK_AMOUNT) tasks_generated = 1; - os_l.lock(); - os << "Task <" << t.get_id() << "> produced by <" << id << ">" << std::endl; - os_l.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds( + (std::rand() % (MAX_PRODUCE_TIME - MIN_PRODUCE_TIME)) + MIN_PRODUCE_TIME)); + Task t = Task(std::chrono::milliseconds( + (std::rand() % (MAX_CONSUME_TIME - MIN_CONSUME_TIME)) + MIN_CONSUME_TIME)); - return t; + // os_l.lock(); + // os << "Task <" << t.get_id() << "> produced by <" << id << ">" << std::endl; + // os_l.unlock(); + + if(t.get_id() > TASK_AMOUNT) return {}; + std::optional T = std::move(t); + return T; } }; @@ -110,64 +144,82 @@ private: public: Consument() : id(cons_cnt++) { - lock_guard lg(os_l); - os << "Consument spawned <" << id << ">" << std::endl; + active_consuments++; + // lock_guard lg(os_l); + // os << "Consument spawned <" << id << ">" << std::endl; } Consument(const Consument&) = delete; - ~Consument() = default; + ~Consument() { + active_consuments--; + } void consume(Task&& t) { t.process(); - os_l.lock(); - os << "Task <" << t.get_id() << "> processed by <" << id << ">" << std::endl; - os_l.unlock(); + // os_l.lock(); + // os << "Task <" << t.get_id() << "> processed by <" << id << ">" << std::endl; + // os_l.unlock(); } void process(Queue* task_queue) { - while (1) { - auto t = task_queue->pop(); - if( ! t.has_value()){ - os_l.lock(); - os << "Consument <" << id << "> died" << std::endl; - os_l.unlock(); + while(1){ + std::optional t = task_queue->pop(); + if( t.has_value() ){ + consume(std::move(t.value())); + continue; + } + if( task_queue->is_empty() ) { + // os_l.lock(); + // os << "Consument <" << id << "> died" << std::endl; + // os_l.unlock(); return; } - consume(std::move(t.value())); } } }; -//############################################################# +//############################################################################################ void consument_thread(Queue* tasks){ Consument c; - c.process(tasks); + c.process(tasks); } void producer_thread(Queue* tasks){ - Producent p; + { + Producent p; + bool produce = true; + while(produce){ + std::optional t = p.produce(); + produce = t.has_value(); + tasks->push(std::move(t)); + } - while(!tasks_generated){ + // os_l.lock(); + // os << "Producent <" << p.id << "> died" << std::endl; + // os_l.unlock(); + } //here dies the producent - tasks->push(std::move(p.produce())); + if(active_producers == 0){ + // os_l.lock(); + // os << "Activating termination sequence." << std::endl; + // os_l.unlock(); + + tasks->terminating_sequence(); } - os_l.lock(); - os << "Producent <" << p.id << "> died" << std::endl; - os_l.unlock(); + } -int main(){ +int main(){ task_cnt = 0; - tasks_generated = false; Queue q; std::vector threads; std::mutex m_l; for(unsigned int i = 0; i < NUM_PRODUCENTS; i++){ - threads.emplace_back(producer_thread, &q); + threads.emplace_back(producer_thread, &q); } for(unsigned int i = 0; i < NUM_CONSUMENTS; i++){ - threads.emplace_back(consument_thread, &q); + threads.emplace_back(consument_thread, &q); } for(auto& t : threads){ @@ -176,6 +228,14 @@ int main(){ os << "joined" << std::endl; os_l.unlock(); } + ////////////////////////////////////////////////////// + + // //Task t1(std::chrono::milliseconds(500)); + // Queue t_q; + // Task t_0(std::chrono::seconds(1)); + // Task t1(std::chrono::seconds(1)); + // Task t2 = t_q.pop(); + // Task t3 = t_q.pop(); return 0; } \ No newline at end of file