From ff43f9804dd627b7b70f11a34c1abaf680bd5d52 Mon Sep 17 00:00:00 2001 From: TUNQRT <bauerovakar@gmail.com> Date: Sat, 8 Mar 2025 18:52:53 +0100 Subject: [PATCH] conditional variables & ending of program --- prod_con.cpp | 162 +++++++++++++++++++++++++++++---------------------- 1 file changed, 91 insertions(+), 71 deletions(-) diff --git a/prod_con.cpp b/prod_con.cpp index 6522825..81c42e2 100644 --- a/prod_con.cpp +++ b/prod_con.cpp @@ -1,3 +1,5 @@ +// TODO: known bugs: +// - produce and push calls in producent thread must be atomic, otherwise we can overproduce tasks #include <thread> #include <chrono> #include <ctime> @@ -7,39 +9,82 @@ #include <vector> #include <queue> #include <atomic> +#include <condition_variable> +#include <optional> -constexpr unsigned MIN_CONSUME_TIME = 5; +using lock_guard = std::lock_guard<std::mutex>; + +constexpr unsigned MIN_CONSUME_TIME = 3; constexpr unsigned MIN_PRODUCE_TIME = 2; constexpr unsigned MAX_CONSUME_TIME = 10; constexpr unsigned MAX_PRODUCE_TIME = 3; +constexpr unsigned NUM_PRODUCENTS = 1; +constexpr unsigned NUM_CONSUMENTS = 3; +constexpr unsigned TASK_AMOUNT = 5; -using lock_guard = std::lock_guard<std::mutex>; char prod_cnt = 'A'; int cons_cnt = 0; -std::atomic_int task_cnt; +std::atomic_uint32_t task_cnt; std::mutex os_l; std::ostream& os = std::cout; - -class Queue; +std::condition_variable queue_not_empty; +std::condition_variable queue_not_full; +std::condition_variable tasks_done; +std::atomic_bool tasks_generated; class Task { -private: - int id; - std::chrono::seconds dificulty; + private: + std::chrono::seconds dificulty; + int id; + public: + Task() = delete; + Task(const Task&) = delete; + Task(Task&& t) : dificulty(t.dificulty), id(t.id) {} + Task(std::chrono::seconds s) : dificulty(s), id(task_cnt++) {} + void process() { std::this_thread::sleep_for(dificulty); } + int const get_id(){ return id; } +}; + +class Queue{ + std::queue<Task> q; + std::mutex q_l; + const size_t MAX_LENGHT = 100'000; public: - Task() = delete; - Task(const Task&) = delete; - Task(Task&& t) : dificulty(t.dificulty), id(t.id) {} - Task(std::chrono::seconds s) : dificulty(s), id(task_cnt++) {} - void process() { std::this_thread::sleep_for(dificulty); } - int const get_id(){ return id; } + std::atomic_bool finished; + Queue(): finished(false) {} + + void push(Task&& t){ + std::unique_lock<std::mutex> pushlock(q_l); + queue_not_full.wait(pushlock, [this]{ return (q.size() < MAX_LENGHT); }); + + q.push(std::move(t)); + if(tasks_generated) finished = 1; + pushlock.unlock(); + queue_not_empty.notify_one(); + } + + std::optional<Task> pop(){ + std::unique_lock<std::mutex> poplock(q_l); + if(finished) return {}; + queue_not_empty.wait(poplock, [this]{ return !is_empty(); }); + + std::optional<Task> t(std::move(q.front())); + q.pop(); + poplock.unlock(); + queue_not_full.notify_one(); + return t; + } + + bool is_empty(){ + return q.empty(); + } }; class Producent { private: - const char id; public: + const char id; Producent() : id(prod_cnt++) { std::lock_guard<std::mutex> lg(os_l); os << "Producent spawned <" << id << ">" << std::endl; @@ -50,18 +95,18 @@ public: std::srand(std::time({})); std::this_thread::sleep_for(std::chrono::seconds((std::rand() % (MAX_PRODUCE_TIME - MIN_PRODUCE_TIME)) + MIN_PRODUCE_TIME)); Task t = Task(std::chrono::seconds((std::rand() % (MAX_CONSUME_TIME - MIN_CONSUME_TIME)) + MIN_CONSUME_TIME)); + if(t.get_id() == TASK_AMOUNT) tasks_generated = 1; os_l.lock(); os << "Task <" << t.get_id() << "> produced by <" << id << ">" << std::endl; os_l.unlock(); - return std::move(t); + return t; } }; class Consument { private: const int id; - Task pickup(Queue* task_queue); public: Consument() : id(cons_cnt++) { @@ -80,81 +125,56 @@ public: void process(Queue* task_queue) { while (1) { - Task t = pickup(task_queue); - //t.process(); - consume(std::move(t)); + auto t = task_queue->pop(); + if( ! t.has_value()){ + os_l.lock(); + os << "Consument <" << id << "> died" << std::endl; + os_l.unlock(); + return; + } + consume(std::move(t.value())); } } }; //############################################################# -constexpr unsigned NUM_PRODUCENTS = 1; -constexpr unsigned NUM_CONSUMENTS = 3; - -class Queue{ - std::queue<Task> q; - std::mutex q_l; - const size_t MAX_LENGHT = 100'000; -public: - void push(Task&& t){ - q_l.lock(); - while(q.size() >= MAX_LENGHT){ - q_l.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - q_l.lock(); - } - q.push(std::move(t)); - q_l.unlock(); - } - - Task pop(){ - q_l.lock(); - while(is_empty()){ - q_l.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - q_l.lock(); - } - Task t(std::move(q.front())); - q.pop(); - q_l.unlock(); - - return t; - } - - bool is_empty(){ - return q.empty(); - } -}; - -Task Consument::pickup(Queue* task_queue){ - Task t = std::move(task_queue->pop()); - return t; -} - -void consumer_thread(Queue* tasks){ +void consument_thread(Queue* tasks){ Consument c; c.process(tasks); } void producer_thread(Queue* tasks){ Producent p; - while(1){ + + while(!tasks_generated){ + tasks->push(std::move(p.produce())); } + os_l.lock(); + os << "Producent <" << p.id << "> died" << std::endl; + os_l.unlock(); } int main(){ task_cnt = 0; + tasks_generated = false; Queue q; std::vector<std::thread> threads; + std::mutex m_l; - while(1){ - if((prod_cnt - 'A') >= NUM_PRODUCENTS && cons_cnt >= NUM_CONSUMENTS) std::this_thread::sleep_for(std::chrono::milliseconds(10)); - else{ - if((prod_cnt - 'A') < NUM_PRODUCENTS) threads.emplace_back(producer_thread, &q); - if(cons_cnt < NUM_CONSUMENTS) threads.emplace_back(consumer_thread, &q); - } + for(unsigned int i = 0; i < NUM_PRODUCENTS; i++){ + threads.emplace_back(producer_thread, &q); + } + for(unsigned int i = 0; i < NUM_CONSUMENTS; i++){ + threads.emplace_back(consument_thread, &q); + } + + for(auto& t : threads){ + t.join(); + os_l.lock(); + os << "joined" << std::endl; + os_l.unlock(); } return 0;