From 615a91f3fe5e0a557700d81ed713bfa8d4ef76a7 Mon Sep 17 00:00:00 2001 From: TUNQRT Date: Thu, 13 Mar 2025 17:35:23 +0100 Subject: [PATCH] thread pool --- task_queue.h | 0 triangulation.cpp | 177 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+) delete mode 100644 task_queue.h diff --git a/task_queue.h b/task_queue.h deleted file mode 100644 index e69de29..0000000 diff --git a/triangulation.cpp b/triangulation.cpp index e69de29..0d064ec 100644 --- a/triangulation.cpp +++ b/triangulation.cpp @@ -0,0 +1,177 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifndef libpoly092986127876181939u43295237 +#define libpoly092986127876181939u43295237 + +extern void test(void); // prints to stdout to test linking + +#include +#include + +struct LP_Point { + double x, y; +}; + +struct LP_Polygon { + std::vector< LP_Point > points; + LP_Polygon(std::vector< LP_Point >&& p) : points(p) {} +}; + +struct LP_Task : public std::enable_shared_from_this< LP_Task > { + LP_Polygon p; + double minimal_triangulation; +}; + +using LP_Task_p = std::shared_ptr< LP_Task >; + +extern LP_Task_p pickup_task(); //je thread safe, nebo si to musím ošéfit? +extern bool submit_task(LP_Task_p&&); +extern void print_stats(); + +#endif // libpoly092986127876181939u43295237 + +//############################################################ GLOBALS + +std::condition_variable can_pop; +std::condition_variable can_push; + +std::atomic_uint32_t active_producents; +std::atomic_uint32_t active_consuments; +std::atomic_bool tasks_incoming; + +//############################# MODIFY HERE + +constexpr unsigned NUM_PRODUCENTS = 20; +constexpr unsigned NUM_WORKERS = 20; + +//############################################################ COMMON STRUCTURES + +class Task { +private: + LP_Task_p task; + bool ready_for_submit; + +public: + Task() = delete; + Task(const Task&) = delete; + Task(Task&& t): task(t.task), ready_for_submit(t.ready_for_submit){ + t.task.reset(); + } + Task(LP_Task_p&& tp): task(tp), ready_for_submit(false){} + void triangulate(); +}; + +class Queue{ + std::queue q; + std::mutex q_l; + const size_t MAX_LENGHT = 100'000; + +public: + + void push(Task&& t){ + std::unique_lock pushlock(q_l); + can_push.wait(pushlock, [this]{ return (q.size() < MAX_LENGHT); }); + + + q.push(std::move(t)); + pushlock.unlock(); + can_pop.notify_one(); + } + + Task pop(){ + std::unique_lock poplock(q_l); + can_pop.wait(poplock, [this]{ return !is_empty(); }); + + Task t(std::move(q.front())); + q.pop(); + poplock.unlock(); + can_push.notify_one(); + return t; + } + + bool is_empty( ){ + return q.empty(); + } +}; + +using task_queue_ptr = std::shared_ptr; + +//############################################################ PRODUCE THREAD + +class Producent{ +private: + task_queue_ptr task_queue; +public: + Producent(task_queue_ptr tq): task_queue(tq){ active_producents++; } + Producent(const Producent&) = delete; + ~Producent(){ active_producents--; } + + Task get_task(){ + LP_Task_p tp = pickup_task(); //might need to be locked - ask Ondra + return Task(std::move(tp)); + } + + void fill_queue(){ + while(tasks_incoming){ + task_queue->push(get_task()); + } + } +}; + +void produce_thread(task_queue_ptr task_queue){ + Producent p(task_queue); + p.fill_queue(); +} + +//############################################################ WORKER THREAD + +class Consument { +private: + task_queue_ptr task_queue; +public: + Consument(task_queue_ptr tq): task_queue(tq){ active_consuments++; } + Consument(const Consument&) = delete; + ~Consument() { active_consuments--; } + + Task get_task(){ // might be redundant + return task_queue->pop(); + } + + void work(){ + while(active_producents || ! task_queue->is_empty()){ + Task t = get_task(); + t.triangulate(); + } + } +}; + +void consume_thread(task_queue_ptr task_queue){ + Consument c(task_queue); +} + +//############################################################ MAIN + +int main(){ + active_producents = 0; + Queue q; + task_queue_ptr queue_ptr(&q); + std::vector threads; + + for(unsigned int i = 0; i < NUM_PRODUCENTS; i++){ + threads.emplace_back(produce_thread, queue_ptr); + } + + std::this_thread::sleep_for(std::chrono::seconds(5)); + for(auto& t : threads){ + t.join(); + } +} \ No newline at end of file