maybe it actually works now git add prod_con.cpp git add prod_con.cpp git add prod_con.cpp git add prod_con.cpp

This commit is contained in:
TUNQRT 2025-03-12 12:23:43 +01:00
parent ff43f9804d
commit 2bf8a38b7c
1 changed files with 121 additions and 61 deletions

View File

@ -12,73 +12,102 @@
#include <condition_variable> #include <condition_variable>
#include <optional> #include <optional>
using lock_guard = std::lock_guard<std::mutex>; using lock_guard = std::lock_guard<std::mutex>;
constexpr unsigned MIN_CONSUME_TIME = 3; constexpr unsigned MIN_PRODUCE_TIME = 1;
constexpr unsigned MIN_PRODUCE_TIME = 2;
constexpr unsigned MAX_CONSUME_TIME = 10;
constexpr unsigned MAX_PRODUCE_TIME = 3; constexpr unsigned MAX_PRODUCE_TIME = 3;
constexpr unsigned NUM_PRODUCENTS = 1; constexpr unsigned MIN_CONSUME_TIME = 1;
constexpr unsigned NUM_CONSUMENTS = 3; constexpr unsigned MAX_CONSUME_TIME = 3;
constexpr unsigned TASK_AMOUNT = 5; constexpr unsigned NUM_PRODUCENTS = 20;
constexpr unsigned NUM_CONSUMENTS = 20;
constexpr unsigned TASK_AMOUNT = 200000;
char prod_cnt = 'A'; char prod_cnt = 'A';
int cons_cnt = 0; int cons_cnt = 0;
std::atomic_uint32_t task_cnt; std::atomic_uint32_t task_cnt;
std::atomic_uint32_t active_producers;
std::atomic_uint32_t active_consuments;
std::mutex os_l; std::mutex os_l;
std::ostream& os = std::cout; std::ostream& os = std::cout;
std::condition_variable queue_not_empty; std::mutex p_l;
std::condition_variable queue_not_full; std::condition_variable can_pop;
std::condition_variable can_push;
std::condition_variable tasks_done; std::condition_variable tasks_done;
std::atomic_bool tasks_generated;
class Task { class Task {
private: private:
std::chrono::seconds dificulty; std::chrono::milliseconds dificulty;
int id; unsigned int id;
bool empty = false;
public: public:
Task() = delete; Task() = delete;
Task(const Task&) = delete; Task(const Task&) = delete;
Task(Task&& t) : dificulty(t.dificulty), id(t.id) {} Task(Task&& t) : dificulty(t.dificulty), id(t.id) {}
Task(std::chrono::seconds s) : dificulty(s), id(task_cnt++) {} Task(std::chrono::milliseconds s) : dificulty(s), id(++task_cnt) {
if (id > TASK_AMOUNT) empty = true;
// os_l.lock();
// print();
// os_l.unlock();
}
bool const is_empty(){ return empty; }
void process() { std::this_thread::sleep_for(dificulty); } void process() { std::this_thread::sleep_for(dificulty); }
int const get_id(){ return id; } unsigned int const get_id(){ return id; }
void print(){
// os_l.lock();
os << "Task <" << id << "> " << "is empty: " << empty << std::endl;
// os_l.unlock();
}
}; };
class Queue{ class Queue{
std::queue<Task> q; std::queue<std::optional<Task>> q;
std::mutex q_l; std::mutex q_l;
const size_t MAX_LENGHT = 100'000; const size_t MAX_LENGHT = 100'000;
public:
std::atomic_bool finished;
Queue(): finished(false) {}
void push(Task&& t){ public:
std::atomic_bool terminating;
Queue(): terminating(false) {}
void push(std::optional<Task>&& t){
std::unique_lock<std::mutex> pushlock(q_l); std::unique_lock<std::mutex> pushlock(q_l);
queue_not_full.wait(pushlock, [this]{ return (q.size() < MAX_LENGHT); }); can_push.wait(pushlock, [this]{ return (q.size() < MAX_LENGHT); });
q.push(std::move(t)); q.push(std::move(t));
if(tasks_generated) finished = 1;
pushlock.unlock(); pushlock.unlock();
queue_not_empty.notify_one(); can_pop.notify_one();
} }
std::optional<Task> pop(){ std::optional<Task> pop(){
std::unique_lock<std::mutex> poplock(q_l); std::unique_lock<std::mutex> poplock(q_l);
if(finished) return {}; can_pop.wait(poplock, [this]{ return !is_empty() || terminating; });
queue_not_empty.wait(poplock, [this]{ return !is_empty(); });
if(terminating && is_empty()) return {};
std::optional<Task> t(std::move(q.front())); std::optional<Task> t(std::move(q.front()));
q.pop(); q.pop();
poplock.unlock(); poplock.unlock();
queue_not_full.notify_one(); can_push.notify_one();
return t; return t;
} }
bool is_empty(){ bool is_empty(){
return q.empty(); return q.empty();
} }
void terminating_sequence(){
terminating = 1;
while( ! q.empty() ){
can_pop.notify_all();
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
// os_l.lock();
// os << "Termination sequence finished." << std::endl;
// os_l.unlock();
}
}; };
class Producent { class Producent {
@ -86,21 +115,26 @@ private:
public: public:
const char id; const char id;
Producent() : id(prod_cnt++) { Producent() : id(prod_cnt++) {
std::lock_guard<std::mutex> lg(os_l); active_producers++;
os << "Producent spawned <" << id << ">" << std::endl; // std::lock_guard<std::mutex> lg(os_l);
// os << "Producent spawned <" << id << ">" << std::endl;
} }
Producent(const Producent&) = delete; Producent(const Producent&) = delete;
~Producent() = default; ~Producent(){ active_producers--; }
Task produce() { std::optional<Task> produce() {
std::srand(std::time({})); std::srand(std::time({}));
std::this_thread::sleep_for(std::chrono::seconds((std::rand() % (MAX_PRODUCE_TIME - MIN_PRODUCE_TIME)) + MIN_PRODUCE_TIME)); std::this_thread::sleep_for(std::chrono::milliseconds(
Task t = Task(std::chrono::seconds((std::rand() % (MAX_CONSUME_TIME - MIN_CONSUME_TIME)) + MIN_CONSUME_TIME)); (std::rand() % (MAX_PRODUCE_TIME - MIN_PRODUCE_TIME)) + MIN_PRODUCE_TIME));
if(t.get_id() == TASK_AMOUNT) tasks_generated = 1; Task t = Task(std::chrono::milliseconds(
os_l.lock(); (std::rand() % (MAX_CONSUME_TIME - MIN_CONSUME_TIME)) + MIN_CONSUME_TIME));
os << "Task <" << t.get_id() << "> produced by <" << id << ">" << std::endl;
os_l.unlock();
return t; // os_l.lock();
// os << "Task <" << t.get_id() << "> produced by <" << id << ">" << std::endl;
// os_l.unlock();
if(t.get_id() > TASK_AMOUNT) return {};
std::optional<Task> T = std::move(t);
return T;
} }
}; };
@ -110,64 +144,82 @@ private:
public: public:
Consument() : id(cons_cnt++) { Consument() : id(cons_cnt++) {
lock_guard lg(os_l); active_consuments++;
os << "Consument spawned <" << id << ">" << std::endl; // lock_guard lg(os_l);
// os << "Consument spawned <" << id << ">" << std::endl;
} }
Consument(const Consument&) = delete; Consument(const Consument&) = delete;
~Consument() = default; ~Consument() {
active_consuments--;
}
void consume(Task&& t) { void consume(Task&& t) {
t.process(); t.process();
os_l.lock(); // os_l.lock();
os << "Task <" << t.get_id() << "> processed by <" << id << ">" << std::endl; // os << "Task <" << t.get_id() << "> processed by <" << id << ">" << std::endl;
os_l.unlock(); // os_l.unlock();
} }
void process(Queue* task_queue) { void process(Queue* task_queue) {
while (1) { while(1){
auto t = task_queue->pop(); std::optional<Task> t = task_queue->pop();
if( ! t.has_value()){ if( t.has_value() ){
os_l.lock(); consume(std::move(t.value()));
os << "Consument <" << id << "> died" << std::endl; continue;
os_l.unlock(); }
if( task_queue->is_empty() ) {
// os_l.lock();
// os << "Consument <" << id << "> died" << std::endl;
// os_l.unlock();
return; return;
} }
consume(std::move(t.value()));
} }
} }
}; };
//############################################################# //############################################################################################
void consument_thread(Queue* tasks){ void consument_thread(Queue* tasks){
Consument c; Consument c;
c.process(tasks); c.process(tasks);
} }
void producer_thread(Queue* tasks){ void producer_thread(Queue* tasks){
Producent p; {
Producent p;
bool produce = true;
while(produce){
std::optional<Task> t = p.produce();
produce = t.has_value();
tasks->push(std::move(t));
}
while(!tasks_generated){ // os_l.lock();
// os << "Producent <" << p.id << "> died" << std::endl;
// os_l.unlock();
} //here dies the producent
tasks->push(std::move(p.produce())); if(active_producers == 0){
// os_l.lock();
// os << "Activating termination sequence." << std::endl;
// os_l.unlock();
tasks->terminating_sequence();
} }
os_l.lock();
os << "Producent <" << p.id << "> died" << std::endl;
os_l.unlock();
} }
int main(){ int main(){
task_cnt = 0; task_cnt = 0;
tasks_generated = false;
Queue q; Queue q;
std::vector<std::thread> threads; std::vector<std::thread> threads;
std::mutex m_l; std::mutex m_l;
for(unsigned int i = 0; i < NUM_PRODUCENTS; i++){ for(unsigned int i = 0; i < NUM_PRODUCENTS; i++){
threads.emplace_back(producer_thread, &q); threads.emplace_back(producer_thread, &q);
} }
for(unsigned int i = 0; i < NUM_CONSUMENTS; i++){ for(unsigned int i = 0; i < NUM_CONSUMENTS; i++){
threads.emplace_back(consument_thread, &q); threads.emplace_back(consument_thread, &q);
} }
for(auto& t : threads){ for(auto& t : threads){
@ -176,6 +228,14 @@ int main(){
os << "joined" << std::endl; os << "joined" << std::endl;
os_l.unlock(); os_l.unlock();
} }
//////////////////////////////////////////////////////
// //Task t1(std::chrono::milliseconds(500));
// Queue t_q;
// Task t_0(std::chrono::seconds(1));
// Task t1(std::chrono::seconds(1));
// Task t2 = t_q.pop();
// Task t3 = t_q.pop();
return 0; return 0;
} }