| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
- file Copyright.txt or https://cmake.org/licensing for details. */
- #ifndef cmWorkerPool_h
- #define cmWorkerPool_h
- #include "cmConfigure.h" // IWYU pragma: keep
- #include "cmAlgorithms.h" // IWYU pragma: keep
- #include <memory>
- #include <stdint.h>
- #include <string>
- #include <utility>
- #include <vector>
- // -- Types
- class cmWorkerPoolInternal;
- /** @class cmWorkerPool
- * @brief Thread pool with job queue
- */
- class cmWorkerPool
- {
- public:
- /**
- * Return value and output of an external process.
- */
- struct ProcessResultT
- {
- void reset();
- bool error() const
- {
- return (ExitStatus != 0) || (TermSignal != 0) || !ErrorMessage.empty();
- }
- std::int64_t ExitStatus = 0;
- int TermSignal = 0;
- std::string StdOut;
- std::string StdErr;
- std::string ErrorMessage;
- };
- /**
- * Abstract job class for concurrent job processing.
- */
- class JobT
- {
- public:
- JobT(JobT const&) = delete;
- JobT& operator=(JobT const&) = delete;
- /**
- * Virtual destructor.
- */
- virtual ~JobT();
- /**
- * Fence job flag
- *
- * Fence jobs require that:
- * - all jobs before in the queue have been processed
- * - no jobs later in the queue will be processed before this job was
- * processed
- */
- bool IsFence() const { return Fence_; }
- protected:
- /**
- * Protected default constructor
- */
- JobT(bool fence = false)
- : Fence_(fence)
- {
- }
- /**
- * Abstract processing interface that must be implement in derived classes.
- */
- virtual void Process() = 0;
- /**
- * Get the worker pool.
- * Only valid during the JobT::Process() call!
- */
- cmWorkerPool* Pool() const { return Pool_; }
- /**
- * Get the user data.
- * Only valid during the JobT::Process() call!
- */
- void* UserData() const { return Pool_->UserData(); };
- /**
- * Get the worker index.
- * This is the index of the thread processing this job and is in the range
- * [0..ThreadCount).
- * Concurrently processing jobs will never have the same WorkerIndex().
- * Only valid during the JobT::Process() call!
- */
- unsigned int WorkerIndex() const { return WorkerIndex_; }
- /**
- * Run an external read only process.
- * Use only during JobT::Process() call!
- */
- bool RunProcess(ProcessResultT& result,
- std::vector<std::string> const& command,
- std::string const& workingDirectory);
- private:
- //! Needs access to Work()
- friend class cmWorkerPoolInternal;
- //! Worker thread entry method.
- void Work(cmWorkerPool* pool, unsigned int workerIndex)
- {
- Pool_ = pool;
- WorkerIndex_ = workerIndex;
- this->Process();
- }
- private:
- cmWorkerPool* Pool_ = nullptr;
- unsigned int WorkerIndex_ = 0;
- bool Fence_ = false;
- };
- /**
- * Job handle type
- */
- typedef std::unique_ptr<JobT> JobHandleT;
- /**
- * Fence job base class
- */
- class JobFenceT : public JobT
- {
- public:
- JobFenceT()
- : JobT(true)
- {
- }
- //! Does nothing
- void Process() override{};
- };
- /**
- * Fence job that aborts the worker pool.
- *
- * Useful as the last job in the job queue.
- */
- class JobEndT : JobFenceT
- {
- public:
- //! Does nothing
- void Process() override { Pool()->Abort(); }
- };
- public:
- // -- Methods
- cmWorkerPool();
- ~cmWorkerPool();
- /**
- * Number of worker threads.
- */
- unsigned int ThreadCount() const { return ThreadCount_; }
- /**
- * Set the number of worker threads.
- *
- * Calling this method during Process() has no effect.
- */
- void SetThreadCount(unsigned int threadCount);
- /**
- * Blocking function that starts threads to process all Jobs in the queue.
- *
- * This method blocks until a job calls the Abort() method.
- * @arg threadCount Number of threads to process jobs.
- * @arg userData Common user data pointer available in all Jobs.
- */
- bool Process(void* userData = nullptr);
- /**
- * User data reference passed to Process().
- *
- * Only valid during Process().
- */
- void* UserData() const { return UserData_; }
- // -- Job processing interface
- /**
- * Clears the job queue and aborts all worker threads.
- *
- * This method is thread safe and can be called from inside a job.
- */
- void Abort();
- /**
- * Push job to the queue.
- *
- * This method is thread safe and can be called from inside a job or before
- * Process().
- */
- bool PushJob(JobHandleT&& jobHandle);
- /**
- * Push job to the queue
- *
- * This method is thread safe and can be called from inside a job or before
- * Process().
- */
- template <class T, typename... Args>
- bool EmplaceJob(Args&&... args)
- {
- return PushJob(cm::make_unique<T>(std::forward<Args>(args)...));
- }
- private:
- void* UserData_ = nullptr;
- unsigned int ThreadCount_ = 1;
- std::unique_ptr<cmWorkerPoolInternal> Int_;
- };
- #endif
|