diff --git a/prod_con.cpp b/prod_con.cpp index 4b664aa..6522825 100644 --- a/prod_con.cpp +++ b/prod_con.cpp @@ -4,25 +4,36 @@ #include #include #include +#include +#include +#include +constexpr unsigned MIN_CONSUME_TIME = 5; +constexpr unsigned MIN_PRODUCE_TIME = 2; constexpr unsigned MAX_CONSUME_TIME = 10; constexpr unsigned MAX_PRODUCE_TIME = 3; -static char prod_cnt = 'A'; -static int cons_cnt = 0; +using lock_guard = std::lock_guard; +char prod_cnt = 'A'; +int cons_cnt = 0; +std::atomic_int task_cnt; std::mutex os_l; std::ostream& os = std::cout; +class Queue; + class Task { private: + int id; std::chrono::seconds dificulty; public: Task() = delete; Task(const Task&) = delete; - Task(Task&& t) : dificulty(t.dificulty) {} - Task(std::chrono::seconds s) : dificulty(s) {} + Task(Task&& t) : dificulty(t.dificulty), id(t.id) {} + Task(std::chrono::seconds s) : dificulty(s), id(task_cnt++) {} void process() { std::this_thread::sleep_for(dificulty); } + int const get_id(){ return id; } }; class Producent { @@ -30,27 +41,31 @@ private: const char id; public: Producent() : id(prod_cnt++) { + std::lock_guard lg(os_l); os << "Producent spawned <" << id << ">" << std::endl; } Producent(const Producent&) = delete; ~Producent() = default; Task produce() { std::srand(std::time({})); - std::this_thread::sleep_for(std::chrono::seconds(std::rand() % MAX_PRODUCE_TIME)); - + std::this_thread::sleep_for(std::chrono::seconds((std::rand() % (MAX_PRODUCE_TIME - MIN_PRODUCE_TIME)) + MIN_PRODUCE_TIME)); + Task t = Task(std::chrono::seconds((std::rand() % (MAX_CONSUME_TIME - MIN_CONSUME_TIME)) + MIN_CONSUME_TIME)); os_l.lock(); - os << "Task produced <" << id << ">" << std::endl; + os << "Task <" << t.get_id() << "> produced by <" << id << ">" << std::endl; os_l.unlock(); - return Task(std::chrono::seconds(std::rand() % MAX_CONSUME_TIME)); + return std::move(t); } }; class Consument { private: const int id; + Task pickup(Queue* task_queue); + public: - Consument() : id(prod_cnt++) { + Consument() : id(cons_cnt++) { + lock_guard lg(os_l); os << "Consument spawned <" << id << ">" << std::endl; } Consument(const Consument&) = delete; @@ -59,12 +74,88 @@ public: t.process(); os_l.lock(); - os << "Task processed <" << id << ">" << std::endl; + os << "Task <" << t.get_id() << "> processed by <" << id << ">" << std::endl; os_l.unlock(); } + + void process(Queue* task_queue) { + while (1) { + Task t = pickup(task_queue); + //t.process(); + consume(std::move(t)); + } + } }; //############################################################# constexpr unsigned NUM_PRODUCENTS = 1; -constexpr unsigned NUM_CONSUMENTS = 3; \ No newline at end of file +constexpr unsigned NUM_CONSUMENTS = 3; + +class Queue{ + std::queue q; + std::mutex q_l; + const size_t MAX_LENGHT = 100'000; +public: + void push(Task&& t){ + q_l.lock(); + while(q.size() >= MAX_LENGHT){ + q_l.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + q_l.lock(); + } + q.push(std::move(t)); + q_l.unlock(); + } + + Task pop(){ + q_l.lock(); + while(is_empty()){ + q_l.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + q_l.lock(); + } + Task t(std::move(q.front())); + q.pop(); + q_l.unlock(); + + return t; + } + + bool is_empty(){ + return q.empty(); + } +}; + +Task Consument::pickup(Queue* task_queue){ + Task t = std::move(task_queue->pop()); + return t; +} + +void consumer_thread(Queue* tasks){ + Consument c; + c.process(tasks); +} + +void producer_thread(Queue* tasks){ + Producent p; + while(1){ + tasks->push(std::move(p.produce())); + } +} + +int main(){ + task_cnt = 0; + Queue q; + std::vector threads; + + while(1){ + if((prod_cnt - 'A') >= NUM_PRODUCENTS && cons_cnt >= NUM_CONSUMENTS) std::this_thread::sleep_for(std::chrono::milliseconds(10)); + else{ + if((prod_cnt - 'A') < NUM_PRODUCENTS) threads.emplace_back(producer_thread, &q); + if(cons_cnt < NUM_CONSUMENTS) threads.emplace_back(consumer_thread, &q); + } + } + + return 0; +} \ No newline at end of file