| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- /*
- * ThreadPool.h, part of VCMI engine
- *
- * Authors: listed in file AUTHORS in main folder
- *
- * License: GNU General Public License v2.0 or later
- * Full text of license available in license.txt file, in main folder
- *
- */
- #pragma once
- #include "BlockingQueue.h"
- #include <boost/thread/future.hpp>
- #include <boost/thread/condition_variable.hpp>
- VCMI_LIB_NAMESPACE_BEGIN
- typedef std::function<void()> TRMGfunction ;
- typedef std::optional<TRMGfunction> TRMGJob;
- //Credit to https://github.com/Liam0205/toy-threadpool/tree/master/yuuki
- class DLL_LINKAGE ThreadPool
- {
- private:
- using Lock = std::unique_lock<std::shared_mutex>;
- mutable std::shared_mutex mx;
- mutable std::condition_variable_any cv;
- mutable std::once_flag once;
- bool isInitialized = false;
- bool stopping = false;
- bool canceling = false;
- public:
- ThreadPool();
- ~ThreadPool();
- void init(size_t numThreads);
- void spawn();
- void terminate();
- void cancel();
- public:
- bool initialized() const;
- bool running() const;
- int size() const;
- private:
- bool isRunning() const;
- public:
- auto async(std::function<void()>&& f) const -> boost::future<void>;
- private:
- std::vector<boost::thread> workers;
- mutable BlockingQueue<TRMGfunction> tasks;
- };
- ThreadPool::ThreadPool()
- {};
- ThreadPool::~ThreadPool()
- {
- terminate();
- }
- inline void ThreadPool::init(size_t numThreads)
- {
- std::call_once(once, [this, numThreads]()
- {
- Lock lock(mx);
- stopping = false;
- canceling = false;
- workers.reserve(numThreads);
- for (size_t i = 0; i < numThreads; ++i)
- {
- workers.emplace_back(std::bind(&ThreadPool::spawn, this));
- }
- isInitialized = true;
- });
- }
- bool ThreadPool::isRunning() const
- {
- return isInitialized && !stopping && !canceling;
- }
- inline bool ThreadPool::initialized() const
- {
- Lock lock(mx);
- return isInitialized;
- }
- inline bool ThreadPool::running() const
- {
- Lock lock(mx);
- return isRunning();
- }
- inline int ThreadPool::size() const
- {
- Lock lock(mx);
- return workers.size();
- }
- inline void ThreadPool::spawn()
- {
- while(true)
- {
- bool pop = false;
- TRMGfunction task;
- {
- Lock lock(mx);
- cv.wait(lock, [this, &pop, &task]
- {
- pop = tasks.pop(task);
- return canceling || stopping || pop;
- });
- }
- if (canceling || (stopping && !pop))
- {
- return;
- }
- task();
- }
- }
- inline void ThreadPool::terminate()
- {
- {
- Lock lock(mx);
- if (isRunning())
- {
- stopping = true;
- }
- else
- {
- return;
- }
- }
- cv.notify_all();
- for (auto& worker : workers)
- {
- worker.join();
- }
- }
- inline void ThreadPool::cancel()
- {
- {
- Lock lock(mx);
- if (running())
- {
- canceling = true;
- }
- else
- {
- return;
- }
- }
- tasks.clear();
- cv.notify_all();
- for (auto& worker : workers)
- {
- worker.join();
- }
- }
- auto ThreadPool::async(std::function<void()>&& f) const -> boost::future<void>
- {
- using TaskT = boost::packaged_task<void>;
- {
- Lock lock(mx);
- if (stopping || canceling)
- {
- throw std::runtime_error("Delegating task to a threadpool that has been terminated or canceled.");
- }
- }
- auto task = std::make_shared<TaskT>(f);
- boost::future<void> fut = task->get_future();
- tasks.emplace([task]() -> void
- {
- (*task)();
- });
- cv.notify_one();
- return fut;
- }
- VCMI_LIB_NAMESPACE_END
|