thread pool

This commit is contained in:
TUNQRT 2025-03-13 17:35:23 +01:00
parent 6c3f13a6e2
commit 982d1a682e
2 changed files with 177 additions and 0 deletions

View File

View File

@ -0,0 +1,177 @@
#include <thread>
#include <chrono>
#include <ctime>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <vector>
#include <queue>
#include <atomic>
#include <condition_variable>
#ifndef libpoly092986127876181939u43295237
#define libpoly092986127876181939u43295237
extern void test(void); // prints to stdout to test linking
#include <vector>
#include <memory>
struct LP_Point {
double x, y;
};
struct LP_Polygon {
std::vector< LP_Point > points;
LP_Polygon(std::vector< LP_Point >&& p) : points(p) {}
};
struct LP_Task : public std::enable_shared_from_this< LP_Task > {
LP_Polygon p;
double minimal_triangulation;
};
using LP_Task_p = std::shared_ptr< LP_Task >;
extern LP_Task_p pickup_task(); //je thread safe, nebo si to musím ošéfit?
extern bool submit_task(LP_Task_p&&);
extern void print_stats();
#endif // libpoly092986127876181939u43295237
//############################################################ GLOBALS
std::condition_variable can_pop;
std::condition_variable can_push;
std::atomic_uint32_t active_producents;
std::atomic_uint32_t active_consuments;
std::atomic_bool tasks_incoming;
//############################# MODIFY HERE
constexpr unsigned NUM_PRODUCENTS = 20;
constexpr unsigned NUM_WORKERS = 20;
//############################################################ COMMON STRUCTURES
class Task {
private:
LP_Task_p task;
bool ready_for_submit;
public:
Task() = delete;
Task(const Task&) = delete;
Task(Task&& t): task(t.task), ready_for_submit(t.ready_for_submit){
t.task.reset();
}
Task(LP_Task_p&& tp): task(tp), ready_for_submit(false){}
void triangulate();
};
class Queue{
std::queue<Task> q;
std::mutex q_l;
const size_t MAX_LENGHT = 100'000;
public:
void push(Task&& t){
std::unique_lock<std::mutex> pushlock(q_l);
can_push.wait(pushlock, [this]{ return (q.size() < MAX_LENGHT); });
q.push(std::move(t));
pushlock.unlock();
can_pop.notify_one();
}
Task pop(){
std::unique_lock<std::mutex> poplock(q_l);
can_pop.wait(poplock, [this]{ return !is_empty(); });
Task t(std::move(q.front()));
q.pop();
poplock.unlock();
can_push.notify_one();
return t;
}
bool is_empty( ){
return q.empty();
}
};
using task_queue_ptr = std::shared_ptr<Queue>;
//############################################################ PRODUCE THREAD
class Producent{
private:
task_queue_ptr task_queue;
public:
Producent(task_queue_ptr tq): task_queue(tq){ active_producents++; }
Producent(const Producent&) = delete;
~Producent(){ active_producents--; }
Task get_task(){
LP_Task_p tp = pickup_task(); //might need to be locked - ask Ondra
return Task(std::move(tp));
}
void fill_queue(){
while(tasks_incoming){
task_queue->push(get_task());
}
}
};
void produce_thread(task_queue_ptr task_queue){
Producent p(task_queue);
p.fill_queue();
}
//############################################################ WORKER THREAD
class Consument {
private:
task_queue_ptr task_queue;
public:
Consument(task_queue_ptr tq): task_queue(tq){ active_consuments++; }
Consument(const Consument&) = delete;
~Consument() { active_consuments--; }
Task get_task(){ // might be redundant
return task_queue->pop();
}
void work(){
while(active_producents || ! task_queue->is_empty()){
Task t = get_task();
t.triangulate();
}
}
};
void consume_thread(task_queue_ptr task_queue){
Consument c(task_queue);
}
//############################################################ MAIN
int main(){
active_producents = 0;
Queue q;
task_queue_ptr queue_ptr(&q);
std::vector<std::thread> threads;
for(unsigned int i = 0; i < NUM_PRODUCENTS; i++){
threads.emplace_back(produce_thread, queue_ptr);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
for(auto& t : threads){
t.join();
}
}