cmWorkerPool.cxx 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
  2. file Copyright.txt or https://cmake.org/licensing for details. */
  3. #include "cmWorkerPool.h"
  4. #include <algorithm>
  5. #include <array>
  6. #include <condition_variable>
  7. #include <cstddef>
  8. #include <deque>
  9. #include <functional>
  10. #include <mutex>
  11. #include <thread>
  12. #include <cm/memory>
  13. #include <cm3p/uv.h>
  14. #include "cmRange.h"
  15. #include "cmStringAlgorithms.h"
  16. #include "cmUVHandlePtr.h"
  17. #include "cmUVSignalHackRAII.h" // IWYU pragma: keep
  18. /**
  19. * @brief libuv pipe buffer class
  20. */
  21. class cmUVPipeBuffer
  22. {
  23. public:
  24. using DataRange = cmRange<const char*>;
  25. using DataFunction = std::function<void(DataRange)>;
  26. /// On error the ssize_t argument is a non zero libuv error code
  27. using EndFunction = std::function<void(ssize_t)>;
  28. public:
  29. /**
  30. * Reset to construction state
  31. */
  32. void reset();
  33. /**
  34. * Initializes uv_pipe(), uv_stream() and uv_handle()
  35. * @return true on success
  36. */
  37. bool init(uv_loop_t* uv_loop);
  38. /**
  39. * Start reading
  40. * @return true on success
  41. */
  42. bool startRead(DataFunction dataFunction, EndFunction endFunction);
  43. //! libuv pipe
  44. uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); }
  45. //! uv_pipe() casted to libuv stream
  46. uv_stream_t* uv_stream() const
  47. {
  48. return static_cast<uv_stream_t*>(this->UVPipe_);
  49. }
  50. //! uv_pipe() casted to libuv handle
  51. uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); }
  52. private:
  53. // -- Libuv callbacks
  54. static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
  55. uv_buf_t* buf);
  56. static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
  57. private:
  58. cm::uv_pipe_ptr UVPipe_;
  59. std::vector<char> Buffer_;
  60. DataFunction DataFunction_;
  61. EndFunction EndFunction_;
  62. };
  63. void cmUVPipeBuffer::reset()
  64. {
  65. if (this->UVPipe_.get() != nullptr) {
  66. this->EndFunction_ = nullptr;
  67. this->DataFunction_ = nullptr;
  68. this->Buffer_.clear();
  69. this->Buffer_.shrink_to_fit();
  70. this->UVPipe_.reset();
  71. }
  72. }
  73. bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
  74. {
  75. this->reset();
  76. if (uv_loop == nullptr) {
  77. return false;
  78. }
  79. int ret = this->UVPipe_.init(*uv_loop, 0, this);
  80. return (ret == 0);
  81. }
  82. bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
  83. EndFunction endFunction)
  84. {
  85. if (this->UVPipe_.get() == nullptr) {
  86. return false;
  87. }
  88. if (!dataFunction || !endFunction) {
  89. return false;
  90. }
  91. this->DataFunction_ = std::move(dataFunction);
  92. this->EndFunction_ = std::move(endFunction);
  93. int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc,
  94. &cmUVPipeBuffer::UVData);
  95. return (ret == 0);
  96. }
  97. void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
  98. uv_buf_t* buf)
  99. {
  100. auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
  101. pipe.Buffer_.resize(suggestedSize);
  102. buf->base = pipe.Buffer_.data();
  103. buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
  104. }
  105. void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
  106. const uv_buf_t* buf)
  107. {
  108. auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
  109. if (nread > 0) {
  110. if (buf->base != nullptr) {
  111. // Call data function
  112. pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
  113. }
  114. } else if (nread < 0) {
  115. // Save the end function on the stack before resetting the pipe
  116. EndFunction efunc;
  117. efunc.swap(pipe.EndFunction_);
  118. // Reset pipe before calling the end function
  119. pipe.reset();
  120. // Call end function
  121. efunc((nread == UV_EOF) ? 0 : nread);
  122. }
  123. }
  124. /**
  125. * @brief External process management class
  126. */
  127. class cmUVReadOnlyProcess
  128. {
  129. public:
  130. // -- Types
  131. //! @brief Process settings
  132. struct SetupT
  133. {
  134. std::string WorkingDirectory;
  135. std::vector<std::string> Command;
  136. cmWorkerPool::ProcessResultT* Result = nullptr;
  137. bool MergedOutput = false;
  138. };
  139. public:
  140. // -- Const accessors
  141. SetupT const& Setup() const { return this->Setup_; }
  142. cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; }
  143. bool IsStarted() const { return this->IsStarted_; }
  144. bool IsFinished() const { return this->IsFinished_; }
  145. // -- Runtime
  146. void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
  147. std::vector<std::string> const& command,
  148. std::string const& workingDirectory = std::string());
  149. bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
  150. private:
  151. // -- Libuv callbacks
  152. static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
  153. void UVPipeOutData(cmUVPipeBuffer::DataRange data) const;
  154. void UVPipeOutEnd(ssize_t error);
  155. void UVPipeErrData(cmUVPipeBuffer::DataRange data) const;
  156. void UVPipeErrEnd(ssize_t error);
  157. void UVTryFinish();
  158. private:
  159. // -- Setup
  160. SetupT Setup_;
  161. // -- Runtime
  162. bool IsStarted_ = false;
  163. bool IsFinished_ = false;
  164. std::function<void()> FinishedCallback_;
  165. std::vector<const char*> CommandPtr_;
  166. std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
  167. uv_process_options_t UVOptions_;
  168. cm::uv_process_ptr UVProcess_;
  169. cmUVPipeBuffer UVPipeOut_;
  170. cmUVPipeBuffer UVPipeErr_;
  171. };
  172. void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
  173. bool mergedOutput,
  174. std::vector<std::string> const& command,
  175. std::string const& workingDirectory)
  176. {
  177. this->Setup_.WorkingDirectory = workingDirectory;
  178. this->Setup_.Command = command;
  179. this->Setup_.Result = result;
  180. this->Setup_.MergedOutput = mergedOutput;
  181. }
  182. bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
  183. std::function<void()> finishedCallback)
  184. {
  185. if (this->IsStarted() || (this->Result() == nullptr)) {
  186. return false;
  187. }
  188. // Reset result before the start
  189. this->Result()->reset();
  190. // Fill command string pointers
  191. if (!this->Setup().Command.empty()) {
  192. this->CommandPtr_.reserve(this->Setup().Command.size() + 1);
  193. for (std::string const& arg : this->Setup().Command) {
  194. this->CommandPtr_.push_back(arg.c_str());
  195. }
  196. this->CommandPtr_.push_back(nullptr);
  197. } else {
  198. this->Result()->ErrorMessage = "Empty command";
  199. }
  200. if (!this->Result()->error()) {
  201. if (!this->UVPipeOut_.init(uv_loop)) {
  202. this->Result()->ErrorMessage = "libuv stdout pipe initialization failed";
  203. }
  204. }
  205. if (!this->Result()->error()) {
  206. if (!this->UVPipeErr_.init(uv_loop)) {
  207. this->Result()->ErrorMessage = "libuv stderr pipe initialization failed";
  208. }
  209. }
  210. if (!this->Result()->error()) {
  211. // -- Setup process stdio options
  212. // stdin
  213. this->UVOptionsStdIO_[0].flags = UV_IGNORE;
  214. this->UVOptionsStdIO_[0].data.stream = nullptr;
  215. // stdout
  216. this->UVOptionsStdIO_[1].flags =
  217. static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
  218. this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream();
  219. // stderr
  220. this->UVOptionsStdIO_[2].flags =
  221. static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
  222. this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream();
  223. // -- Setup process options
  224. std::fill_n(reinterpret_cast<char*>(&this->UVOptions_),
  225. sizeof(this->UVOptions_), 0);
  226. this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
  227. this->UVOptions_.file = this->CommandPtr_[0];
  228. this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data());
  229. this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str();
  230. this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
  231. this->UVOptions_.stdio_count =
  232. static_cast<int>(this->UVOptionsStdIO_.size());
  233. this->UVOptions_.stdio = this->UVOptionsStdIO_.data();
  234. // -- Spawn process
  235. int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this);
  236. if (uvErrorCode != 0) {
  237. this->Result()->ErrorMessage = "libuv process spawn failed";
  238. if (const char* uvErr = uv_strerror(uvErrorCode)) {
  239. this->Result()->ErrorMessage += ": ";
  240. this->Result()->ErrorMessage += uvErr;
  241. }
  242. }
  243. }
  244. // -- Start reading from stdio streams
  245. if (!this->Result()->error()) {
  246. if (!this->UVPipeOut_.startRead(
  247. [this](cmUVPipeBuffer::DataRange range) {
  248. this->UVPipeOutData(range);
  249. },
  250. [this](ssize_t error) { this->UVPipeOutEnd(error); })) {
  251. this->Result()->ErrorMessage =
  252. "libuv start reading from stdout pipe failed";
  253. }
  254. }
  255. if (!this->Result()->error()) {
  256. if (!this->UVPipeErr_.startRead(
  257. [this](cmUVPipeBuffer::DataRange range) {
  258. this->UVPipeErrData(range);
  259. },
  260. [this](ssize_t error) { this->UVPipeErrEnd(error); })) {
  261. this->Result()->ErrorMessage =
  262. "libuv start reading from stderr pipe failed";
  263. }
  264. }
  265. if (!this->Result()->error()) {
  266. this->IsStarted_ = true;
  267. this->FinishedCallback_ = std::move(finishedCallback);
  268. } else {
  269. // Clear libuv handles and finish
  270. this->UVProcess_.reset();
  271. this->UVPipeOut_.reset();
  272. this->UVPipeErr_.reset();
  273. this->CommandPtr_.clear();
  274. }
  275. return this->IsStarted();
  276. }
  277. void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
  278. int termSignal)
  279. {
  280. auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
  281. if (proc.IsStarted() && !proc.IsFinished()) {
  282. // Set error message on demand
  283. proc.Result()->ExitStatus = exitStatus;
  284. proc.Result()->TermSignal = termSignal;
  285. if (!proc.Result()->error()) {
  286. if (termSignal != 0) {
  287. proc.Result()->ErrorMessage = cmStrCat(
  288. "Process was terminated by signal ", proc.Result()->TermSignal);
  289. } else if (exitStatus != 0) {
  290. proc.Result()->ErrorMessage = cmStrCat(
  291. "Process failed with return value ", proc.Result()->ExitStatus);
  292. }
  293. }
  294. // Reset process handle
  295. proc.UVProcess_.reset();
  296. // Try finish
  297. proc.UVTryFinish();
  298. }
  299. }
  300. void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const
  301. {
  302. this->Result()->StdOut.append(data.begin(), data.end());
  303. }
  304. void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
  305. {
  306. // Process pipe error
  307. if ((error != 0) && !this->Result()->error()) {
  308. this->Result()->ErrorMessage = cmStrCat(
  309. "Reading from stdout pipe failed with libuv error code ", error);
  310. }
  311. // Try finish
  312. this->UVTryFinish();
  313. }
  314. void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const
  315. {
  316. std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut
  317. : &this->Result()->StdErr;
  318. str->append(data.begin(), data.end());
  319. }
  320. void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
  321. {
  322. // Process pipe error
  323. if ((error != 0) && !this->Result()->error()) {
  324. this->Result()->ErrorMessage = cmStrCat(
  325. "Reading from stderr pipe failed with libuv error code ", error);
  326. }
  327. // Try finish
  328. this->UVTryFinish();
  329. }
  330. void cmUVReadOnlyProcess::UVTryFinish()
  331. {
  332. // There still might be data in the pipes after the process has finished.
  333. // Therefore check if the process is finished AND all pipes are closed
  334. // before signaling the worker thread to continue.
  335. if ((this->UVProcess_.get() != nullptr) ||
  336. (this->UVPipeOut_.uv_pipe() != nullptr) ||
  337. (this->UVPipeErr_.uv_pipe() != nullptr)) {
  338. return;
  339. }
  340. this->IsFinished_ = true;
  341. this->FinishedCallback_();
  342. }
  343. /**
  344. * @brief Worker pool worker thread
  345. */
  346. class cmWorkerPoolWorker
  347. {
  348. public:
  349. cmWorkerPoolWorker(uv_loop_t& uvLoop);
  350. ~cmWorkerPoolWorker();
  351. cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
  352. cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
  353. /**
  354. * Set the internal thread
  355. */
  356. void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); }
  357. /**
  358. * Run an external process
  359. */
  360. bool RunProcess(cmWorkerPool::ProcessResultT& result,
  361. std::vector<std::string> const& command,
  362. std::string const& workingDirectory);
  363. private:
  364. // -- Libuv callbacks
  365. static void UVProcessStart(uv_async_t* handle);
  366. void UVProcessFinished();
  367. private:
  368. // -- Process management
  369. struct
  370. {
  371. std::mutex Mutex;
  372. cm::uv_async_ptr Request;
  373. std::condition_variable Condition;
  374. std::unique_ptr<cmUVReadOnlyProcess> ROP;
  375. } Proc_;
  376. // -- System thread
  377. std::thread Thread_;
  378. };
  379. cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
  380. {
  381. this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
  382. }
  383. cmWorkerPoolWorker::~cmWorkerPoolWorker()
  384. {
  385. if (this->Thread_.joinable()) {
  386. this->Thread_.join();
  387. }
  388. }
  389. bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
  390. std::vector<std::string> const& command,
  391. std::string const& workingDirectory)
  392. {
  393. if (command.empty()) {
  394. return false;
  395. }
  396. // Create process instance
  397. {
  398. std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
  399. this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
  400. this->Proc_.ROP->setup(&result, true, command, workingDirectory);
  401. }
  402. // Send asynchronous process start request to libuv loop
  403. this->Proc_.Request.send();
  404. // Wait until the process has been finished and destroyed
  405. {
  406. std::unique_lock<std::mutex> ulock(this->Proc_.Mutex);
  407. while (this->Proc_.ROP) {
  408. this->Proc_.Condition.wait(ulock);
  409. }
  410. }
  411. return !result.error();
  412. }
  413. void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
  414. {
  415. auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
  416. bool startFailed = false;
  417. {
  418. auto& Proc = wrk->Proc_;
  419. std::lock_guard<std::mutex> lock(Proc.Mutex);
  420. if (Proc.ROP && !Proc.ROP->IsStarted()) {
  421. startFailed =
  422. !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
  423. }
  424. }
  425. // Clean up if starting of the process failed
  426. if (startFailed) {
  427. wrk->UVProcessFinished();
  428. }
  429. }
  430. void cmWorkerPoolWorker::UVProcessFinished()
  431. {
  432. std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
  433. if (this->Proc_.ROP &&
  434. (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) {
  435. this->Proc_.ROP.reset();
  436. }
  437. // Notify idling thread
  438. this->Proc_.Condition.notify_one();
  439. }
  440. /**
  441. * @brief Private worker pool internals
  442. */
  443. class cmWorkerPoolInternal
  444. {
  445. public:
  446. // -- Constructors
  447. cmWorkerPoolInternal(cmWorkerPool* pool);
  448. ~cmWorkerPoolInternal();
  449. /**
  450. * Runs the libuv loop.
  451. */
  452. bool Process();
  453. /**
  454. * Clear queue and abort threads.
  455. */
  456. void Abort();
  457. /**
  458. * Push a job to the queue and notify a worker.
  459. */
  460. bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
  461. /**
  462. * Worker thread main loop method.
  463. */
  464. void Work(unsigned int workerIndex);
  465. // -- Request slots
  466. static void UVSlotBegin(uv_async_t* handle);
  467. static void UVSlotEnd(uv_async_t* handle);
  468. public:
  469. // -- UV loop
  470. #ifdef CMAKE_UV_SIGNAL_HACK
  471. std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
  472. #endif
  473. std::unique_ptr<uv_loop_t> UVLoop;
  474. cm::uv_async_ptr UVRequestBegin;
  475. cm::uv_async_ptr UVRequestEnd;
  476. // -- Thread pool and job queue
  477. std::mutex Mutex;
  478. bool Processing = false;
  479. bool Aborting = false;
  480. bool FenceProcessing = false;
  481. unsigned int WorkersRunning = 0;
  482. unsigned int WorkersIdle = 0;
  483. unsigned int JobsProcessing = 0;
  484. std::deque<cmWorkerPool::JobHandleT> Queue;
  485. std::condition_variable Condition;
  486. std::condition_variable ConditionFence;
  487. std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
  488. // -- References
  489. cmWorkerPool* Pool = nullptr;
  490. };
  491. void cmWorkerPool::ProcessResultT::reset()
  492. {
  493. this->ExitStatus = 0;
  494. this->TermSignal = 0;
  495. if (!this->StdOut.empty()) {
  496. this->StdOut.clear();
  497. this->StdOut.shrink_to_fit();
  498. }
  499. if (!this->StdErr.empty()) {
  500. this->StdErr.clear();
  501. this->StdErr.shrink_to_fit();
  502. }
  503. if (!this->ErrorMessage.empty()) {
  504. this->ErrorMessage.clear();
  505. this->ErrorMessage.shrink_to_fit();
  506. }
  507. }
  508. cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
  509. : Pool(pool)
  510. {
  511. // Initialize libuv loop
  512. uv_disable_stdio_inheritance();
  513. #ifdef CMAKE_UV_SIGNAL_HACK
  514. UVHackRAII = cm::make_unique<cmUVSignalHackRAII>();
  515. #endif
  516. this->UVLoop = cm::make_unique<uv_loop_t>();
  517. uv_loop_init(this->UVLoop.get());
  518. }
  519. cmWorkerPoolInternal::~cmWorkerPoolInternal()
  520. {
  521. uv_loop_close(this->UVLoop.get());
  522. }
  523. bool cmWorkerPoolInternal::Process()
  524. {
  525. // Reset state flags
  526. this->Processing = true;
  527. this->Aborting = false;
  528. // Initialize libuv asynchronous request
  529. this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin,
  530. this);
  531. this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd,
  532. this);
  533. // Send begin request
  534. this->UVRequestBegin.send();
  535. // Run libuv loop
  536. bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0);
  537. // Update state flags
  538. this->Processing = false;
  539. this->Aborting = false;
  540. return success;
  541. }
  542. void cmWorkerPoolInternal::Abort()
  543. {
  544. // Clear all jobs and set abort flag
  545. std::lock_guard<std::mutex> guard(this->Mutex);
  546. if (!this->Aborting) {
  547. // Register abort and clear queue
  548. this->Aborting = true;
  549. this->Queue.clear();
  550. this->Condition.notify_all();
  551. }
  552. }
  553. inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
  554. {
  555. std::lock_guard<std::mutex> guard(this->Mutex);
  556. if (this->Aborting) {
  557. return false;
  558. }
  559. // Append the job to the queue
  560. this->Queue.emplace_back(std::move(jobHandle));
  561. // Notify an idle worker if there's one
  562. if (this->WorkersIdle != 0) {
  563. this->Condition.notify_one();
  564. }
  565. // Return success
  566. return true;
  567. }
  568. void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
  569. {
  570. auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  571. // Create worker threads
  572. {
  573. unsigned int const num = gint.Pool->ThreadCount();
  574. // Create workers
  575. gint.Workers.reserve(num);
  576. for (unsigned int ii = 0; ii != num; ++ii) {
  577. gint.Workers.emplace_back(
  578. cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
  579. }
  580. // Start worker threads
  581. for (unsigned int ii = 0; ii != num; ++ii) {
  582. gint.Workers[ii]->SetThread(
  583. std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
  584. }
  585. }
  586. // Destroy begin request
  587. gint.UVRequestBegin.reset();
  588. }
  589. void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
  590. {
  591. auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  592. // Join and destroy worker threads
  593. gint.Workers.clear();
  594. // Destroy end request
  595. gint.UVRequestEnd.reset();
  596. }
  597. void cmWorkerPoolInternal::Work(unsigned int workerIndex)
  598. {
  599. cmWorkerPool::JobHandleT jobHandle;
  600. std::unique_lock<std::mutex> uLock(this->Mutex);
  601. // Increment running workers count
  602. ++this->WorkersRunning;
  603. // Enter worker main loop
  604. while (true) {
  605. // Abort on request
  606. if (this->Aborting) {
  607. break;
  608. }
  609. // Wait for new jobs on the main CV
  610. if (this->Queue.empty()) {
  611. ++this->WorkersIdle;
  612. this->Condition.wait(uLock);
  613. --this->WorkersIdle;
  614. continue;
  615. }
  616. // If there is a fence currently active or waiting,
  617. // sleep on the main CV and try again.
  618. if (this->FenceProcessing) {
  619. this->Condition.wait(uLock);
  620. continue;
  621. }
  622. // Pop next job from queue
  623. jobHandle = std::move(this->Queue.front());
  624. this->Queue.pop_front();
  625. // Check for fence jobs
  626. bool raisedFence = false;
  627. if (jobHandle->IsFence()) {
  628. this->FenceProcessing = true;
  629. raisedFence = true;
  630. // Wait on the Fence CV until all pending jobs are done.
  631. while (this->JobsProcessing != 0 && !this->Aborting) {
  632. this->ConditionFence.wait(uLock);
  633. }
  634. // When aborting, explicitly kick all threads alive once more.
  635. if (this->Aborting) {
  636. this->FenceProcessing = false;
  637. this->Condition.notify_all();
  638. break;
  639. }
  640. }
  641. // Unlocked scope for job processing
  642. ++this->JobsProcessing;
  643. {
  644. uLock.unlock();
  645. jobHandle->Work(this->Pool, workerIndex); // Process job
  646. jobHandle.reset(); // Destroy job
  647. uLock.lock();
  648. }
  649. --this->JobsProcessing;
  650. // If this was the thread that entered fence processing
  651. // originally, notify all idling workers that the fence
  652. // is done.
  653. if (raisedFence) {
  654. this->FenceProcessing = false;
  655. this->Condition.notify_all();
  656. }
  657. // If fence processing is still not done, notify the
  658. // the fencing worker when all active jobs are done.
  659. if (this->FenceProcessing && this->JobsProcessing == 0) {
  660. this->ConditionFence.notify_all();
  661. }
  662. }
  663. // Decrement running workers count
  664. if (--this->WorkersRunning == 0) {
  665. // Last worker thread about to finish. Send libuv event.
  666. this->UVRequestEnd.send();
  667. }
  668. }
  669. cmWorkerPool::JobT::~JobT() = default;
  670. bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
  671. std::vector<std::string> const& command,
  672. std::string const& workingDirectory)
  673. {
  674. // Get worker by index
  675. auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get();
  676. return wrk->RunProcess(result, command, workingDirectory);
  677. }
  678. cmWorkerPool::cmWorkerPool()
  679. : Int_(cm::make_unique<cmWorkerPoolInternal>(this))
  680. {
  681. }
  682. cmWorkerPool::~cmWorkerPool() = default;
  683. void cmWorkerPool::SetThreadCount(unsigned int threadCount)
  684. {
  685. if (!this->Int_->Processing) {
  686. this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
  687. }
  688. }
  689. bool cmWorkerPool::Process(void* userData)
  690. {
  691. // Setup user data
  692. this->UserData_ = userData;
  693. // Run libuv loop
  694. bool success = this->Int_->Process();
  695. // Clear user data
  696. this->UserData_ = nullptr;
  697. // Return
  698. return success;
  699. }
  700. bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
  701. {
  702. return this->Int_->PushJob(std::move(jobHandle));
  703. }
  704. void cmWorkerPool::Abort()
  705. {
  706. this->Int_->Abort();
  707. }