diff --git a/src/thread_pool.cpp b/src/thread_pool.cpp index 8ac3c163..bb4b68f6 100644 --- a/src/thread_pool.cpp +++ b/src/thread_pool.cpp @@ -22,6 +22,8 @@ namespace big LOG(VERBOSE) << "Allocating " << thread_count << " threads in thread pool."; this->m_thread_pool.reserve(thread_count); + m_available_thread_count = thread_count; + for (std::uint32_t i = 0; i < thread_count; i++) this->m_thread_pool.emplace_back(std::thread(&thread_pool::run, this)); } @@ -41,13 +43,18 @@ namespace big m_thread_pool.clear(); } - void thread_pool::push(std::function func) + void thread_pool::push(std::function func, std::source_location location) { if (func) { { std::unique_lock lock(this->m_lock); - this->m_job_stack.push(std::move(func)); + this->m_job_stack.push({func, location}); + + if (m_available_thread_count < m_job_stack.size()) + { + LOG(WARNING) << "thread_pool potentially starved"; + } } this->m_data_condition.notify_all(); } @@ -68,20 +75,28 @@ namespace big if (this->m_job_stack.empty()) continue; - std::function job = std::move(this->m_job_stack.top()); + thread_pool_job job = this->m_job_stack.top(); this->m_job_stack.pop(); lock.unlock(); + m_available_thread_count--; + try { - std::invoke(std::move(job)); + const auto source_file = std::filesystem::path(job.m_source_location.file_name()).filename().string(); + LOG(VERBOSE) << "Thread " << std::this_thread::get_id() << " executing " << source_file << ":" + << job.m_source_location.line(); + + std::invoke(job.m_func); } catch (const std::exception& e) { LOG(WARNING) << "Exception thrown while executing job in thread:" << std::endl << e.what(); } + + m_available_thread_count++; } LOG(VERBOSE) << "Thread " << std::this_thread::get_id() << " exiting..."; } -} \ No newline at end of file +} diff --git a/src/thread_pool.hpp b/src/thread_pool.hpp index 7f7532e5..5cb1a8bc 100644 --- a/src/thread_pool.hpp +++ b/src/thread_pool.hpp @@ -2,23 +2,31 @@ namespace big { + struct thread_pool_job + { + std::function m_func; + std::source_location m_source_location; + }; + class thread_pool { std::atomic m_accept_jobs; std::condition_variable m_data_condition; - std::stack> m_job_stack; + std::stack m_job_stack; std::mutex m_lock; std::vector m_thread_pool; std::thread m_managing_thread; + std::atomic m_available_thread_count; + public: thread_pool(); ~thread_pool(); void destroy(); - void push(std::function func); + void push(std::function func, std::source_location location = std::source_location::current()); private: void create(); @@ -27,4 +35,4 @@ namespace big }; inline thread_pool* g_thread_pool{}; -} \ No newline at end of file +}