cmUVJobServerClient.cxx 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. /* Distributed under the OSI-approved BSD 3-Clause License. See accompanying
  2. file Copyright.txt or https://cmake.org/licensing for details. */
  3. #include "cmUVJobServerClient.h"
  4. #include <cassert>
  5. #include <utility>
  6. #ifndef _WIN32
  7. # include <cstdio>
  8. # include <string>
  9. # include <vector>
  10. # include <fcntl.h>
  11. # include <unistd.h>
  12. #endif
  13. #include <cm/memory>
  14. #include <cm/optional>
  15. #include <cm/string_view>
  16. #include "cmRange.h"
  17. #include "cmStringAlgorithms.h"
  18. #include "cmSystemTools.h"
  19. #include "cmUVHandlePtr.h"
  20. class cmUVJobServerClient::Impl
  21. {
  22. public:
  23. uv_loop_t& Loop;
  24. cm::uv_idle_ptr ImplicitToken;
  25. std::function<void()> OnToken;
  26. std::function<void(int)> OnDisconnect;
  27. // The number of tokens held by this client.
  28. unsigned int HeldTokens = 0;
  29. // The number of tokens we need to receive from the job server.
  30. unsigned int NeedTokens = 0;
  31. Impl(uv_loop_t& loop);
  32. virtual ~Impl();
  33. virtual void SendToken() = 0;
  34. virtual void StartReceivingTokens() = 0;
  35. virtual void StopReceivingTokens() = 0;
  36. void RequestToken();
  37. void ReleaseToken();
  38. void RequestExplicitToken();
  39. void DecrementNeedTokens();
  40. void HoldToken();
  41. void RequestImplicitToken();
  42. void ReleaseImplicitToken();
  43. void ReceivedToken();
  44. void Disconnected(int status);
  45. };
  46. cmUVJobServerClient::Impl::Impl(uv_loop_t& loop)
  47. : Loop(loop)
  48. {
  49. this->ImplicitToken.init(this->Loop, this);
  50. }
  51. cmUVJobServerClient::Impl::~Impl() = default;
  52. void cmUVJobServerClient::Impl::RequestToken()
  53. {
  54. if (this->HeldTokens == 0 && !uv_is_active(this->ImplicitToken)) {
  55. this->RequestImplicitToken();
  56. } else {
  57. this->RequestExplicitToken();
  58. }
  59. }
  60. void cmUVJobServerClient::Impl::ReleaseToken()
  61. {
  62. assert(this->HeldTokens > 0);
  63. --this->HeldTokens;
  64. if (this->HeldTokens == 0) {
  65. // This was the token implicitly owned by our process.
  66. this->ReleaseImplicitToken();
  67. } else {
  68. // This was a token we received from the job server. Send it back.
  69. this->SendToken();
  70. }
  71. }
  72. void cmUVJobServerClient::Impl::RequestExplicitToken()
  73. {
  74. ++this->NeedTokens;
  75. this->StartReceivingTokens();
  76. }
  77. void cmUVJobServerClient::Impl::DecrementNeedTokens()
  78. {
  79. assert(this->NeedTokens > 0);
  80. --this->NeedTokens;
  81. if (this->NeedTokens == 0) {
  82. this->StopReceivingTokens();
  83. }
  84. }
  85. void cmUVJobServerClient::Impl::HoldToken()
  86. {
  87. ++this->HeldTokens;
  88. if (this->OnToken) {
  89. this->OnToken();
  90. } else {
  91. this->ReleaseToken();
  92. }
  93. }
  94. void cmUVJobServerClient::Impl::RequestImplicitToken()
  95. {
  96. assert(this->HeldTokens == 0);
  97. this->ImplicitToken.start([](uv_idle_t* handle) {
  98. uv_idle_stop(handle);
  99. auto* self = static_cast<Impl*>(handle->data);
  100. self->HoldToken();
  101. });
  102. }
  103. void cmUVJobServerClient::Impl::ReleaseImplicitToken()
  104. {
  105. assert(this->HeldTokens == 0);
  106. // Use the implicit token in place of receiving one from the job server.
  107. if (this->NeedTokens > 0) {
  108. this->DecrementNeedTokens();
  109. this->RequestImplicitToken();
  110. }
  111. }
  112. void cmUVJobServerClient::Impl::ReceivedToken()
  113. {
  114. this->DecrementNeedTokens();
  115. this->HoldToken();
  116. }
  117. void cmUVJobServerClient::Impl::Disconnected(int status)
  118. {
  119. if (this->OnDisconnect) {
  120. this->OnDisconnect(status);
  121. }
  122. }
  123. //---------------------------------------------------------------------------
  124. // Implementation on POSIX platforms.
  125. // https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
  126. #ifndef _WIN32
  127. namespace {
  128. class ImplPosix : public cmUVJobServerClient::Impl
  129. {
  130. public:
  131. enum class Connection
  132. {
  133. None,
  134. FDs,
  135. FIFO,
  136. };
  137. Connection Conn = Connection::None;
  138. cm::uv_pipe_ptr ConnRead;
  139. cm::uv_pipe_ptr ConnWrite;
  140. cm::uv_pipe_ptr ConnFIFO;
  141. std::shared_ptr<std::function<void(int)>> OnWrite;
  142. void Connect();
  143. void ConnectFDs(int rfd, int wfd);
  144. void ConnectFIFO(const char* path);
  145. void Disconnect(int status);
  146. cm::uv_pipe_ptr OpenFD(int fd);
  147. uv_stream_t* GetWriter() const;
  148. uv_stream_t* GetReader() const;
  149. static void OnAllocateCB(uv_handle_t* handle, size_t suggested_size,
  150. uv_buf_t* buf);
  151. static void OnReadCB(uv_stream_t* stream, ssize_t nread,
  152. const uv_buf_t* buf);
  153. void OnAllocate(size_t suggested_size, uv_buf_t* buf);
  154. void OnRead(ssize_t nread, const uv_buf_t* buf);
  155. char ReadBuf = '.';
  156. bool ReceivingTokens = false;
  157. bool IsConnected() const;
  158. void SendToken() override;
  159. void StartReceivingTokens() override;
  160. void StopReceivingTokens() override;
  161. ImplPosix(uv_loop_t& loop);
  162. ~ImplPosix() override;
  163. };
  164. ImplPosix::ImplPosix(uv_loop_t& loop)
  165. : Impl(loop)
  166. , OnWrite(std::make_shared<std::function<void(int)>>([this](int status) {
  167. if (status != 0) {
  168. this->Disconnect(status);
  169. }
  170. }))
  171. {
  172. this->Connect();
  173. }
  174. ImplPosix::~ImplPosix()
  175. {
  176. this->Disconnect(0);
  177. }
  178. void ImplPosix::Connect()
  179. {
  180. // --jobserver-auth= for gnu make versions >= 4.2
  181. // --jobserver-fds= for gnu make versions < 4.2
  182. // -J for bsd make
  183. static const std::vector<cm::string_view> prefixes = {
  184. "--jobserver-auth=", "--jobserver-fds=", "-J"
  185. };
  186. cm::optional<std::string> makeflags = cmSystemTools::GetEnvVar("MAKEFLAGS");
  187. if (!makeflags) {
  188. return;
  189. }
  190. // Look for the *last* occurrence of jobserver flags.
  191. cm::optional<std::string> auth;
  192. std::vector<std::string> args;
  193. cmSystemTools::ParseUnixCommandLine(makeflags->c_str(), args);
  194. for (cm::string_view arg : cmReverseRange(args)) {
  195. for (cm::string_view prefix : prefixes) {
  196. if (cmHasPrefix(arg, prefix)) {
  197. auth = cmTrimWhitespace(arg.substr(prefix.length()));
  198. break;
  199. }
  200. }
  201. if (auth) {
  202. break;
  203. }
  204. }
  205. if (!auth) {
  206. return;
  207. }
  208. // fifo:PATH
  209. if (cmHasLiteralPrefix(*auth, "fifo:")) {
  210. ConnectFIFO(auth->substr(cmStrLen("fifo:")).c_str());
  211. return;
  212. }
  213. // reader,writer
  214. int reader;
  215. int writer;
  216. if (std::sscanf(auth->c_str(), "%d,%d", &reader, &writer) == 2) {
  217. ConnectFDs(reader, writer);
  218. }
  219. }
  220. cm::uv_pipe_ptr ImplPosix::OpenFD(int fd)
  221. {
  222. // Create a CLOEXEC duplicate so `uv_pipe_ptr` can close it
  223. // without closing the original file descriptor, which our
  224. // child processes might want to use too.
  225. cm::uv_pipe_ptr p;
  226. int fd_dup = dup(fd);
  227. if (fd_dup < 0) {
  228. return p;
  229. }
  230. if (fcntl(fd_dup, F_SETFD, FD_CLOEXEC) == -1) {
  231. close(fd_dup);
  232. return p;
  233. }
  234. p.init(this->Loop, 0, this);
  235. if (uv_pipe_open(p, fd_dup) < 0) {
  236. close(fd_dup);
  237. }
  238. return p;
  239. }
  240. void ImplPosix::ConnectFDs(int rfd, int wfd)
  241. {
  242. cm::uv_pipe_ptr connRead = this->OpenFD(rfd);
  243. cm::uv_pipe_ptr connWrite = this->OpenFD(wfd);
  244. // Verify that the read end is readable and the write end is writable.
  245. if (!connRead || !uv_is_readable(connRead) || //
  246. !connWrite || !uv_is_writable(connWrite)) {
  247. return;
  248. }
  249. this->ConnRead = std::move(connRead);
  250. this->ConnWrite = std::move(connWrite);
  251. this->Conn = Connection::FDs;
  252. }
  253. void ImplPosix::ConnectFIFO(const char* path)
  254. {
  255. int fd = open(path, O_RDWR);
  256. if (fd < 0) {
  257. return;
  258. }
  259. cm::uv_pipe_ptr connFIFO;
  260. connFIFO.init(this->Loop, 0, this);
  261. if (uv_pipe_open(connFIFO, fd) != 0) {
  262. close(fd);
  263. return;
  264. }
  265. // Verify that the fifo is both readable and writable.
  266. if (!connFIFO || !uv_is_readable(connFIFO) || !uv_is_writable(connFIFO)) {
  267. return;
  268. }
  269. this->ConnFIFO = std::move(connFIFO);
  270. this->Conn = Connection::FIFO;
  271. }
  272. void ImplPosix::Disconnect(int status)
  273. {
  274. if (this->Conn == Connection::None) {
  275. return;
  276. }
  277. this->StopReceivingTokens();
  278. switch (this->Conn) {
  279. case Connection::FDs:
  280. this->ConnRead.reset();
  281. this->ConnWrite.reset();
  282. break;
  283. case Connection::FIFO:
  284. this->ConnFIFO.reset();
  285. break;
  286. default:
  287. break;
  288. }
  289. this->Conn = Connection::None;
  290. if (status != 0) {
  291. this->Disconnected(status);
  292. }
  293. }
  294. uv_stream_t* ImplPosix::GetWriter() const
  295. {
  296. switch (this->Conn) {
  297. case Connection::FDs:
  298. return this->ConnWrite;
  299. case Connection::FIFO:
  300. return this->ConnFIFO;
  301. default:
  302. return nullptr;
  303. }
  304. }
  305. uv_stream_t* ImplPosix::GetReader() const
  306. {
  307. switch (this->Conn) {
  308. case Connection::FDs:
  309. return this->ConnRead;
  310. case Connection::FIFO:
  311. return this->ConnFIFO;
  312. default:
  313. return nullptr;
  314. }
  315. }
  316. void ImplPosix::OnAllocateCB(uv_handle_t* handle, size_t suggested_size,
  317. uv_buf_t* buf)
  318. {
  319. auto* self = static_cast<ImplPosix*>(handle->data);
  320. self->OnAllocate(suggested_size, buf);
  321. }
  322. void ImplPosix::OnReadCB(uv_stream_t* stream, ssize_t nread,
  323. const uv_buf_t* buf)
  324. {
  325. auto* self = static_cast<ImplPosix*>(stream->data);
  326. self->OnRead(nread, buf);
  327. }
  328. void ImplPosix::OnAllocate(size_t /*suggested_size*/, uv_buf_t* buf)
  329. {
  330. *buf = uv_buf_init(&this->ReadBuf, 1);
  331. }
  332. void ImplPosix::OnRead(ssize_t nread, const uv_buf_t* /*buf*/)
  333. {
  334. if (nread == 0) {
  335. return;
  336. }
  337. if (nread < 0) {
  338. auto status = static_cast<int>(nread);
  339. this->Disconnect(status);
  340. return;
  341. }
  342. assert(nread == 1);
  343. this->ReceivedToken();
  344. }
  345. bool ImplPosix::IsConnected() const
  346. {
  347. return this->Conn != Connection::None;
  348. }
  349. void ImplPosix::SendToken()
  350. {
  351. if (this->Conn == Connection::None) {
  352. return;
  353. }
  354. static char token = '.';
  355. uv_buf_t const buf = uv_buf_init(&token, sizeof(token));
  356. int status = cm::uv_write(this->GetWriter(), &buf, 1, this->OnWrite);
  357. if (status != 0) {
  358. this->Disconnect(status);
  359. }
  360. }
  361. void ImplPosix::StartReceivingTokens()
  362. {
  363. if (this->Conn == Connection::None) {
  364. return;
  365. }
  366. if (this->ReceivingTokens) {
  367. return;
  368. }
  369. int status = uv_read_start(this->GetReader(), &ImplPosix::OnAllocateCB,
  370. &ImplPosix::OnReadCB);
  371. if (status != 0) {
  372. this->Disconnect(status);
  373. return;
  374. }
  375. this->ReceivingTokens = true;
  376. }
  377. void ImplPosix::StopReceivingTokens()
  378. {
  379. if (this->Conn == Connection::None) {
  380. return;
  381. }
  382. if (!this->ReceivingTokens) {
  383. return;
  384. }
  385. this->ReceivingTokens = false;
  386. uv_read_stop(this->GetReader());
  387. }
  388. }
  389. #endif
  390. //---------------------------------------------------------------------------
  391. // Implementation of public interface.
  392. cmUVJobServerClient::cmUVJobServerClient(std::unique_ptr<Impl> impl)
  393. : Impl_(std::move(impl))
  394. {
  395. }
  396. cmUVJobServerClient::~cmUVJobServerClient() = default;
  397. cmUVJobServerClient::cmUVJobServerClient(cmUVJobServerClient&&) noexcept =
  398. default;
  399. cmUVJobServerClient& cmUVJobServerClient::operator=(
  400. cmUVJobServerClient&&) noexcept = default;
  401. void cmUVJobServerClient::RequestToken()
  402. {
  403. this->Impl_->RequestToken();
  404. }
  405. void cmUVJobServerClient::ReleaseToken()
  406. {
  407. this->Impl_->ReleaseToken();
  408. }
  409. int cmUVJobServerClient::GetHeldTokens() const
  410. {
  411. return this->Impl_->HeldTokens;
  412. }
  413. int cmUVJobServerClient::GetNeedTokens() const
  414. {
  415. return this->Impl_->NeedTokens;
  416. }
  417. cm::optional<cmUVJobServerClient> cmUVJobServerClient::Connect(
  418. uv_loop_t& loop, std::function<void()> onToken,
  419. std::function<void(int)> onDisconnect)
  420. {
  421. #if defined(_WIN32)
  422. // FIXME: Windows job server client not yet implemented.
  423. static_cast<void>(loop);
  424. static_cast<void>(onToken);
  425. static_cast<void>(onDisconnect);
  426. #else
  427. auto impl = cm::make_unique<ImplPosix>(loop);
  428. if (impl && impl->IsConnected()) {
  429. impl->OnToken = std::move(onToken);
  430. impl->OnDisconnect = std::move(onDisconnect);
  431. return cmUVJobServerClient(std::move(impl));
  432. }
  433. #endif
  434. return cm::nullopt;
  435. }