cmWorkerPool.cxx 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781
  1. /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
  2. file Copyright.txt or https://cmake.org/licensing for details. */
  3. #include "cmWorkerPool.h"
  4. #include <algorithm>
  5. #include <array>
  6. #include <condition_variable>
  7. #include <cstddef>
  8. #include <deque>
  9. #include <functional>
  10. #include <mutex>
  11. #include <thread>
  12. #include <cm/memory>
  13. #include <cm3p/uv.h>
  14. #include "cmRange.h"
  15. #include "cmStringAlgorithms.h"
  16. #include "cmUVHandlePtr.h"
  17. #include "cmUVSignalHackRAII.h" // IWYU pragma: keep
  18. /**
  19. * @brief libuv pipe buffer class
  20. */
  21. class cmUVPipeBuffer
  22. {
  23. public:
  24. using DataRange = cmRange<const char*>;
  25. using DataFunction = std::function<void(DataRange)>;
  26. /// On error the ssize_t argument is a non zero libuv error code
  27. using EndFunction = std::function<void(ssize_t)>;
  28. /**
  29. * Reset to construction state
  30. */
  31. void reset();
  32. /**
  33. * Initializes uv_pipe(), uv_stream() and uv_handle()
  34. * @return true on success
  35. */
  36. bool init(uv_loop_t* uv_loop);
  37. /**
  38. * Start reading
  39. * @return true on success
  40. */
  41. bool startRead(DataFunction dataFunction, EndFunction endFunction);
  42. //! libuv pipe
  43. uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); }
  44. //! uv_pipe() casted to libuv stream
  45. uv_stream_t* uv_stream() const
  46. {
  47. return static_cast<uv_stream_t*>(this->UVPipe_);
  48. }
  49. //! uv_pipe() casted to libuv handle
  50. uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); }
  51. private:
  52. // -- Libuv callbacks
  53. static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
  54. uv_buf_t* buf);
  55. static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
  56. cm::uv_pipe_ptr UVPipe_;
  57. std::vector<char> Buffer_;
  58. DataFunction DataFunction_;
  59. EndFunction EndFunction_;
  60. };
  61. void cmUVPipeBuffer::reset()
  62. {
  63. if (this->UVPipe_.get() != nullptr) {
  64. this->EndFunction_ = nullptr;
  65. this->DataFunction_ = nullptr;
  66. this->Buffer_.clear();
  67. this->Buffer_.shrink_to_fit();
  68. this->UVPipe_.reset();
  69. }
  70. }
  71. bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
  72. {
  73. this->reset();
  74. if (uv_loop == nullptr) {
  75. return false;
  76. }
  77. int ret = this->UVPipe_.init(*uv_loop, 0, this);
  78. return (ret == 0);
  79. }
  80. bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
  81. EndFunction endFunction)
  82. {
  83. if (this->UVPipe_.get() == nullptr) {
  84. return false;
  85. }
  86. if (!dataFunction || !endFunction) {
  87. return false;
  88. }
  89. this->DataFunction_ = std::move(dataFunction);
  90. this->EndFunction_ = std::move(endFunction);
  91. int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc,
  92. &cmUVPipeBuffer::UVData);
  93. return (ret == 0);
  94. }
  95. void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
  96. uv_buf_t* buf)
  97. {
  98. auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
  99. pipe.Buffer_.resize(suggestedSize);
  100. buf->base = pipe.Buffer_.data();
  101. buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
  102. }
  103. void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
  104. const uv_buf_t* buf)
  105. {
  106. auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
  107. if (nread > 0) {
  108. if (buf->base != nullptr) {
  109. // Call data function
  110. pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
  111. }
  112. } else if (nread < 0) {
  113. // Save the end function on the stack before resetting the pipe
  114. EndFunction efunc;
  115. efunc.swap(pipe.EndFunction_);
  116. // Reset pipe before calling the end function
  117. pipe.reset();
  118. // Call end function
  119. efunc((nread == UV_EOF) ? 0 : nread);
  120. }
  121. }
  122. /**
  123. * @brief External process management class
  124. */
  125. class cmUVReadOnlyProcess
  126. {
  127. public:
  128. // -- Types
  129. //! @brief Process settings
  130. struct SetupT
  131. {
  132. std::string WorkingDirectory;
  133. std::vector<std::string> Command;
  134. cmWorkerPool::ProcessResultT* Result = nullptr;
  135. bool MergedOutput = false;
  136. };
  137. // -- Const accessors
  138. SetupT const& Setup() const { return this->Setup_; }
  139. cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; }
  140. bool IsStarted() const { return this->IsStarted_; }
  141. bool IsFinished() const { return this->IsFinished_; }
  142. // -- Runtime
  143. void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
  144. std::vector<std::string> const& command,
  145. std::string const& workingDirectory = std::string());
  146. bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
  147. private:
  148. // -- Libuv callbacks
  149. static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
  150. void UVPipeOutData(cmUVPipeBuffer::DataRange data) const;
  151. void UVPipeOutEnd(ssize_t error);
  152. void UVPipeErrData(cmUVPipeBuffer::DataRange data) const;
  153. void UVPipeErrEnd(ssize_t error);
  154. void UVTryFinish();
  155. // -- Setup
  156. SetupT Setup_;
  157. // -- Runtime
  158. bool IsStarted_ = false;
  159. bool IsFinished_ = false;
  160. std::function<void()> FinishedCallback_;
  161. std::vector<const char*> CommandPtr_;
  162. std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
  163. uv_process_options_t UVOptions_;
  164. cm::uv_process_ptr UVProcess_;
  165. cmUVPipeBuffer UVPipeOut_;
  166. cmUVPipeBuffer UVPipeErr_;
  167. };
  168. void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
  169. bool mergedOutput,
  170. std::vector<std::string> const& command,
  171. std::string const& workingDirectory)
  172. {
  173. this->Setup_.WorkingDirectory = workingDirectory;
  174. this->Setup_.Command = command;
  175. this->Setup_.Result = result;
  176. this->Setup_.MergedOutput = mergedOutput;
  177. }
  178. bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
  179. std::function<void()> finishedCallback)
  180. {
  181. if (this->IsStarted() || (this->Result() == nullptr)) {
  182. return false;
  183. }
  184. // Reset result before the start
  185. this->Result()->reset();
  186. // Fill command string pointers
  187. if (!this->Setup().Command.empty()) {
  188. this->CommandPtr_.reserve(this->Setup().Command.size() + 1);
  189. for (std::string const& arg : this->Setup().Command) {
  190. this->CommandPtr_.push_back(arg.c_str());
  191. }
  192. this->CommandPtr_.push_back(nullptr);
  193. } else {
  194. this->Result()->ErrorMessage = "Empty command";
  195. }
  196. if (!this->Result()->error()) {
  197. if (!this->UVPipeOut_.init(uv_loop)) {
  198. this->Result()->ErrorMessage = "libuv stdout pipe initialization failed";
  199. }
  200. }
  201. if (!this->Result()->error()) {
  202. if (!this->UVPipeErr_.init(uv_loop)) {
  203. this->Result()->ErrorMessage = "libuv stderr pipe initialization failed";
  204. }
  205. }
  206. if (!this->Result()->error()) {
  207. // -- Setup process stdio options
  208. // stdin
  209. this->UVOptionsStdIO_[0].flags = UV_IGNORE;
  210. this->UVOptionsStdIO_[0].data.stream = nullptr;
  211. // stdout
  212. this->UVOptionsStdIO_[1].flags =
  213. static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
  214. this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream();
  215. // stderr
  216. this->UVOptionsStdIO_[2].flags =
  217. static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
  218. this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream();
  219. // -- Setup process options
  220. std::fill_n(reinterpret_cast<char*>(&this->UVOptions_),
  221. sizeof(this->UVOptions_), 0);
  222. this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
  223. this->UVOptions_.file = this->CommandPtr_[0];
  224. this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data());
  225. this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str();
  226. this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
  227. this->UVOptions_.stdio_count =
  228. static_cast<int>(this->UVOptionsStdIO_.size());
  229. this->UVOptions_.stdio = this->UVOptionsStdIO_.data();
  230. // -- Spawn process
  231. int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this);
  232. if (uvErrorCode != 0) {
  233. this->Result()->ErrorMessage = "libuv process spawn failed";
  234. if (const char* uvErr = uv_strerror(uvErrorCode)) {
  235. this->Result()->ErrorMessage += ": ";
  236. this->Result()->ErrorMessage += uvErr;
  237. }
  238. }
  239. }
  240. // -- Start reading from stdio streams
  241. if (!this->Result()->error()) {
  242. if (!this->UVPipeOut_.startRead(
  243. [this](cmUVPipeBuffer::DataRange range) {
  244. this->UVPipeOutData(range);
  245. },
  246. [this](ssize_t error) { this->UVPipeOutEnd(error); })) {
  247. this->Result()->ErrorMessage =
  248. "libuv start reading from stdout pipe failed";
  249. }
  250. }
  251. if (!this->Result()->error()) {
  252. if (!this->UVPipeErr_.startRead(
  253. [this](cmUVPipeBuffer::DataRange range) {
  254. this->UVPipeErrData(range);
  255. },
  256. [this](ssize_t error) { this->UVPipeErrEnd(error); })) {
  257. this->Result()->ErrorMessage =
  258. "libuv start reading from stderr pipe failed";
  259. }
  260. }
  261. if (!this->Result()->error()) {
  262. this->IsStarted_ = true;
  263. this->FinishedCallback_ = std::move(finishedCallback);
  264. } else {
  265. // Clear libuv handles and finish
  266. this->UVProcess_.reset();
  267. this->UVPipeOut_.reset();
  268. this->UVPipeErr_.reset();
  269. this->CommandPtr_.clear();
  270. }
  271. return this->IsStarted();
  272. }
  273. void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
  274. int termSignal)
  275. {
  276. auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
  277. if (proc.IsStarted() && !proc.IsFinished()) {
  278. // Set error message on demand
  279. proc.Result()->ExitStatus = exitStatus;
  280. proc.Result()->TermSignal = termSignal;
  281. if (!proc.Result()->error()) {
  282. if (termSignal != 0) {
  283. proc.Result()->ErrorMessage = cmStrCat(
  284. "Process was terminated by signal ", proc.Result()->TermSignal);
  285. } else if (exitStatus != 0) {
  286. proc.Result()->ErrorMessage = cmStrCat(
  287. "Process failed with return value ", proc.Result()->ExitStatus);
  288. }
  289. }
  290. // Reset process handle
  291. proc.UVProcess_.reset();
  292. // Try finish
  293. proc.UVTryFinish();
  294. }
  295. }
  296. void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const
  297. {
  298. this->Result()->StdOut.append(data.begin(), data.end());
  299. }
  300. void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
  301. {
  302. // Process pipe error
  303. if ((error != 0) && !this->Result()->error()) {
  304. this->Result()->ErrorMessage = cmStrCat(
  305. "Reading from stdout pipe failed with libuv error code ", error);
  306. }
  307. // Try finish
  308. this->UVTryFinish();
  309. }
  310. void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const
  311. {
  312. std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut
  313. : &this->Result()->StdErr;
  314. str->append(data.begin(), data.end());
  315. }
  316. void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
  317. {
  318. // Process pipe error
  319. if ((error != 0) && !this->Result()->error()) {
  320. this->Result()->ErrorMessage = cmStrCat(
  321. "Reading from stderr pipe failed with libuv error code ", error);
  322. }
  323. // Try finish
  324. this->UVTryFinish();
  325. }
  326. void cmUVReadOnlyProcess::UVTryFinish()
  327. {
  328. // There still might be data in the pipes after the process has finished.
  329. // Therefore check if the process is finished AND all pipes are closed
  330. // before signaling the worker thread to continue.
  331. if ((this->UVProcess_.get() != nullptr) ||
  332. (this->UVPipeOut_.uv_pipe() != nullptr) ||
  333. (this->UVPipeErr_.uv_pipe() != nullptr)) {
  334. return;
  335. }
  336. this->IsFinished_ = true;
  337. this->FinishedCallback_();
  338. }
  339. /**
  340. * @brief Worker pool worker thread
  341. */
  342. class cmWorkerPoolWorker
  343. {
  344. public:
  345. cmWorkerPoolWorker(uv_loop_t& uvLoop);
  346. ~cmWorkerPoolWorker();
  347. cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
  348. cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
  349. /**
  350. * Set the internal thread
  351. */
  352. void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); }
  353. /**
  354. * Run an external process
  355. */
  356. bool RunProcess(cmWorkerPool::ProcessResultT& result,
  357. std::vector<std::string> const& command,
  358. std::string const& workingDirectory);
  359. private:
  360. // -- Libuv callbacks
  361. static void UVProcessStart(uv_async_t* handle);
  362. void UVProcessFinished();
  363. // -- Process management
  364. struct
  365. {
  366. std::mutex Mutex;
  367. cm::uv_async_ptr Request;
  368. std::condition_variable Condition;
  369. std::unique_ptr<cmUVReadOnlyProcess> ROP;
  370. } Proc_;
  371. // -- System thread
  372. std::thread Thread_;
  373. };
  374. cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
  375. {
  376. this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
  377. }
  378. cmWorkerPoolWorker::~cmWorkerPoolWorker()
  379. {
  380. if (this->Thread_.joinable()) {
  381. this->Thread_.join();
  382. }
  383. }
  384. bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
  385. std::vector<std::string> const& command,
  386. std::string const& workingDirectory)
  387. {
  388. if (command.empty()) {
  389. return false;
  390. }
  391. // Create process instance
  392. {
  393. std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
  394. this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
  395. this->Proc_.ROP->setup(&result, true, command, workingDirectory);
  396. }
  397. // Send asynchronous process start request to libuv loop
  398. this->Proc_.Request.send();
  399. // Wait until the process has been finished and destroyed
  400. {
  401. std::unique_lock<std::mutex> ulock(this->Proc_.Mutex);
  402. while (this->Proc_.ROP) {
  403. this->Proc_.Condition.wait(ulock);
  404. }
  405. }
  406. return !result.error();
  407. }
  408. void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
  409. {
  410. auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
  411. bool startFailed = false;
  412. {
  413. auto& Proc = wrk->Proc_;
  414. std::lock_guard<std::mutex> lock(Proc.Mutex);
  415. if (Proc.ROP && !Proc.ROP->IsStarted()) {
  416. startFailed =
  417. !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
  418. }
  419. }
  420. // Clean up if starting of the process failed
  421. if (startFailed) {
  422. wrk->UVProcessFinished();
  423. }
  424. }
  425. void cmWorkerPoolWorker::UVProcessFinished()
  426. {
  427. std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
  428. if (this->Proc_.ROP &&
  429. (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) {
  430. this->Proc_.ROP.reset();
  431. }
  432. // Notify idling thread
  433. this->Proc_.Condition.notify_one();
  434. }
  435. /**
  436. * @brief Private worker pool internals
  437. */
  438. class cmWorkerPoolInternal
  439. {
  440. public:
  441. // -- Constructors
  442. cmWorkerPoolInternal(cmWorkerPool* pool);
  443. ~cmWorkerPoolInternal();
  444. /**
  445. * Runs the libuv loop.
  446. */
  447. bool Process();
  448. /**
  449. * Clear queue and abort threads.
  450. */
  451. void Abort();
  452. /**
  453. * Push a job to the queue and notify a worker.
  454. */
  455. bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
  456. /**
  457. * Worker thread main loop method.
  458. */
  459. void Work(unsigned int workerIndex);
  460. // -- Request slots
  461. static void UVSlotBegin(uv_async_t* handle);
  462. static void UVSlotEnd(uv_async_t* handle);
  463. // -- UV loop
  464. #ifdef CMAKE_UV_SIGNAL_HACK
  465. std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
  466. #endif
  467. std::unique_ptr<uv_loop_t> UVLoop;
  468. cm::uv_async_ptr UVRequestBegin;
  469. cm::uv_async_ptr UVRequestEnd;
  470. // -- Thread pool and job queue
  471. std::mutex Mutex;
  472. bool Processing = false;
  473. bool Aborting = false;
  474. bool FenceProcessing = false;
  475. unsigned int WorkersRunning = 0;
  476. unsigned int WorkersIdle = 0;
  477. unsigned int JobsProcessing = 0;
  478. std::deque<cmWorkerPool::JobHandleT> Queue;
  479. std::condition_variable Condition;
  480. std::condition_variable ConditionFence;
  481. std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
  482. // -- References
  483. cmWorkerPool* Pool = nullptr;
  484. };
  485. void cmWorkerPool::ProcessResultT::reset()
  486. {
  487. this->ExitStatus = 0;
  488. this->TermSignal = 0;
  489. if (!this->StdOut.empty()) {
  490. this->StdOut.clear();
  491. this->StdOut.shrink_to_fit();
  492. }
  493. if (!this->StdErr.empty()) {
  494. this->StdErr.clear();
  495. this->StdErr.shrink_to_fit();
  496. }
  497. if (!this->ErrorMessage.empty()) {
  498. this->ErrorMessage.clear();
  499. this->ErrorMessage.shrink_to_fit();
  500. }
  501. }
  502. cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
  503. : Pool(pool)
  504. {
  505. // Initialize libuv loop
  506. uv_disable_stdio_inheritance();
  507. #ifdef CMAKE_UV_SIGNAL_HACK
  508. UVHackRAII = cm::make_unique<cmUVSignalHackRAII>();
  509. #endif
  510. this->UVLoop = cm::make_unique<uv_loop_t>();
  511. uv_loop_init(this->UVLoop.get());
  512. }
  513. cmWorkerPoolInternal::~cmWorkerPoolInternal()
  514. {
  515. uv_loop_close(this->UVLoop.get());
  516. }
  517. bool cmWorkerPoolInternal::Process()
  518. {
  519. // Reset state flags
  520. this->Processing = true;
  521. this->Aborting = false;
  522. // Initialize libuv asynchronous request
  523. this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin,
  524. this);
  525. this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd,
  526. this);
  527. // Send begin request
  528. this->UVRequestBegin.send();
  529. // Run libuv loop
  530. bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0);
  531. // Update state flags
  532. this->Processing = false;
  533. this->Aborting = false;
  534. return success;
  535. }
  536. void cmWorkerPoolInternal::Abort()
  537. {
  538. // Clear all jobs and set abort flag
  539. std::lock_guard<std::mutex> guard(this->Mutex);
  540. if (!this->Aborting) {
  541. // Register abort and clear queue
  542. this->Aborting = true;
  543. this->Queue.clear();
  544. this->Condition.notify_all();
  545. }
  546. }
  547. inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
  548. {
  549. std::lock_guard<std::mutex> guard(this->Mutex);
  550. if (this->Aborting) {
  551. return false;
  552. }
  553. // Append the job to the queue
  554. this->Queue.emplace_back(std::move(jobHandle));
  555. // Notify an idle worker if there's one
  556. if (this->WorkersIdle != 0) {
  557. this->Condition.notify_one();
  558. }
  559. // Return success
  560. return true;
  561. }
  562. void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
  563. {
  564. auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  565. // Create worker threads
  566. {
  567. unsigned int const num = gint.Pool->ThreadCount();
  568. // Create workers
  569. gint.Workers.reserve(num);
  570. for (unsigned int ii = 0; ii != num; ++ii) {
  571. gint.Workers.emplace_back(
  572. cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
  573. }
  574. // Start worker threads
  575. for (unsigned int ii = 0; ii != num; ++ii) {
  576. gint.Workers[ii]->SetThread(
  577. std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
  578. }
  579. }
  580. // Destroy begin request
  581. gint.UVRequestBegin.reset();
  582. }
  583. void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
  584. {
  585. auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
  586. // Join and destroy worker threads
  587. gint.Workers.clear();
  588. // Destroy end request
  589. gint.UVRequestEnd.reset();
  590. }
  591. void cmWorkerPoolInternal::Work(unsigned int workerIndex)
  592. {
  593. cmWorkerPool::JobHandleT jobHandle;
  594. std::unique_lock<std::mutex> uLock(this->Mutex);
  595. // Increment running workers count
  596. ++this->WorkersRunning;
  597. // Enter worker main loop
  598. while (true) {
  599. // Abort on request
  600. if (this->Aborting) {
  601. break;
  602. }
  603. // Wait for new jobs on the main CV
  604. if (this->Queue.empty()) {
  605. ++this->WorkersIdle;
  606. this->Condition.wait(uLock);
  607. --this->WorkersIdle;
  608. continue;
  609. }
  610. // If there is a fence currently active or waiting,
  611. // sleep on the main CV and try again.
  612. if (this->FenceProcessing) {
  613. this->Condition.wait(uLock);
  614. continue;
  615. }
  616. // Pop next job from queue
  617. jobHandle = std::move(this->Queue.front());
  618. this->Queue.pop_front();
  619. // Check for fence jobs
  620. bool raisedFence = false;
  621. if (jobHandle->IsFence()) {
  622. this->FenceProcessing = true;
  623. raisedFence = true;
  624. // Wait on the Fence CV until all pending jobs are done.
  625. while (this->JobsProcessing != 0 && !this->Aborting) {
  626. this->ConditionFence.wait(uLock);
  627. }
  628. // When aborting, explicitly kick all threads alive once more.
  629. if (this->Aborting) {
  630. this->FenceProcessing = false;
  631. this->Condition.notify_all();
  632. break;
  633. }
  634. }
  635. // Unlocked scope for job processing
  636. ++this->JobsProcessing;
  637. {
  638. uLock.unlock();
  639. jobHandle->Work(this->Pool, workerIndex); // Process job
  640. jobHandle.reset(); // Destroy job
  641. uLock.lock();
  642. }
  643. --this->JobsProcessing;
  644. // If this was the thread that entered fence processing
  645. // originally, notify all idling workers that the fence
  646. // is done.
  647. if (raisedFence) {
  648. this->FenceProcessing = false;
  649. this->Condition.notify_all();
  650. }
  651. // If fence processing is still not done, notify the
  652. // the fencing worker when all active jobs are done.
  653. if (this->FenceProcessing && this->JobsProcessing == 0) {
  654. this->ConditionFence.notify_all();
  655. }
  656. }
  657. // Decrement running workers count
  658. if (--this->WorkersRunning == 0) {
  659. // Last worker thread about to finish. Send libuv event.
  660. this->UVRequestEnd.send();
  661. }
  662. }
  663. cmWorkerPool::JobT::~JobT() = default;
  664. bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
  665. std::vector<std::string> const& command,
  666. std::string const& workingDirectory)
  667. {
  668. // Get worker by index
  669. auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get();
  670. return wrk->RunProcess(result, command, workingDirectory);
  671. }
  672. cmWorkerPool::cmWorkerPool()
  673. : Int_(cm::make_unique<cmWorkerPoolInternal>(this))
  674. {
  675. }
  676. cmWorkerPool::~cmWorkerPool() = default;
  677. void cmWorkerPool::SetThreadCount(unsigned int threadCount)
  678. {
  679. if (!this->Int_->Processing) {
  680. this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
  681. }
  682. }
  683. bool cmWorkerPool::Process(void* userData)
  684. {
  685. // Setup user data
  686. this->UserData_ = userData;
  687. // Run libuv loop
  688. bool success = this->Int_->Process();
  689. // Clear user data
  690. this->UserData_ = nullptr;
  691. // Return
  692. return success;
  693. }
  694. bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
  695. {
  696. return this->Int_->PushJob(std::move(jobHandle));
  697. }
  698. void cmWorkerPool::Abort()
  699. {
  700. this->Int_->Abort();
  701. }