test-lib-http2.cc 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949
  1. #include "gtest/gtest.h"
  2. #include <algorithm>
  3. #include <atomic>
  4. #include <chrono>
  5. #include <cstring>
  6. #include <fcntl.h>
  7. #include <iostream>
  8. #include <poll.h>
  9. #include <set>
  10. #include <string>
  11. #include <sys/socket.h>
  12. #include <thread>
  13. #include <unistd.h>
  14. #include <vector>
  15. #include "smartdns/http2.h"
  16. class LIBHTTP2 : public ::testing::Test
  17. {
  18. protected:
  19. void SetUp() override
  20. {
  21. // Create socketpair for communication
  22. if (socketpair(AF_UNIX, SOCK_STREAM, 0, socks) < 0) {
  23. perror("socketpair");
  24. FAIL() << "Failed to create socketpair";
  25. }
  26. client_sock = socks[0];
  27. server_sock = socks[1];
  28. // Set non-blocking
  29. fcntl(client_sock, F_SETFL, O_NONBLOCK);
  30. fcntl(server_sock, F_SETFL, O_NONBLOCK);
  31. }
  32. void TearDown() override
  33. {
  34. if (client_sock != -1)
  35. close(client_sock);
  36. if (server_sock != -1)
  37. close(server_sock);
  38. }
  39. int socks[2];
  40. int client_sock = -1;
  41. int server_sock = -1;
  42. // BIO callbacks
  43. static int bio_read(void *private_data, uint8_t *buf, int len)
  44. {
  45. int fd = *(int *)private_data;
  46. int ret = read(fd, buf, len);
  47. if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  48. errno = EAGAIN;
  49. return -1;
  50. }
  51. return ret;
  52. }
  53. static int bio_write(void *private_data, const uint8_t *buf, int len)
  54. {
  55. int fd = *(int *)private_data;
  56. int ret = write(fd, buf, len);
  57. if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
  58. errno = EAGAIN;
  59. return -1;
  60. }
  61. return ret;
  62. }
  63. };
  64. TEST_F(LIBHTTP2, Integrated)
  65. {
  66. std::thread server_thread([this]() {
  67. // Server logic
  68. struct http2_ctx *ctx = http2_ctx_server_new("test-server", bio_read, bio_write, &server_sock, NULL);
  69. ASSERT_NE(ctx, nullptr);
  70. // Handshake
  71. int handshake_attempts = 200;
  72. int ret = 0;
  73. while (handshake_attempts-- > 0) {
  74. struct pollfd pfd = {server_sock, POLLIN, 0};
  75. int poll_ret = poll(&pfd, 1, 10);
  76. if (poll_ret == 0) {
  77. continue;
  78. }
  79. ret = http2_ctx_handshake(ctx);
  80. if (ret == 1)
  81. break;
  82. if (ret < 0)
  83. break;
  84. }
  85. ASSERT_EQ(ret, 1) << "Server handshake failed";
  86. // Accept stream
  87. struct http2_stream *stream = nullptr;
  88. int max_attempts = 200;
  89. while (max_attempts-- > 0 && !stream) {
  90. struct pollfd pfd = {server_sock, POLLIN, 0};
  91. poll(&pfd, 1, 100);
  92. struct http2_poll_item items[10];
  93. int count = 0;
  94. http2_ctx_poll(ctx, items, 10, &count);
  95. for (int i = 0; i < count; i++) {
  96. if (items[i].stream == nullptr && items[i].readable) {
  97. stream = http2_ctx_accept_stream(ctx);
  98. if (stream)
  99. break;
  100. }
  101. }
  102. usleep(20000);
  103. }
  104. if (!stream) {
  105. std::cout << "Server failed to accept stream after timeout" << std::endl;
  106. }
  107. ASSERT_NE(stream, nullptr) << "Server failed to accept stream";
  108. // Read request body
  109. uint8_t request_body[4096];
  110. int request_body_len = 0;
  111. while (!http2_stream_is_end(stream) && request_body_len < (int)sizeof(request_body)) {
  112. int read_len = http2_stream_read_body(stream, request_body + request_body_len,
  113. sizeof(request_body) - request_body_len);
  114. if (read_len > 0) {
  115. request_body_len += read_len;
  116. } else {
  117. usleep(10000);
  118. }
  119. }
  120. // Send response
  121. char response[8192];
  122. int response_len = snprintf(response, sizeof(response), "Echo Response: %.*s", request_body_len, request_body);
  123. char content_length[32];
  124. snprintf(content_length, sizeof(content_length), "%d", response_len);
  125. struct http2_header_pair headers[] = {{"content-type", "text/plain"}, {"content-length", content_length}};
  126. http2_stream_set_response(stream, 200, headers, 2);
  127. http2_stream_write_body(stream, (const uint8_t *)response, response_len, 1);
  128. http2_stream_close(stream);
  129. http2_ctx_close(ctx);
  130. });
  131. std::thread client_thread([this]() {
  132. usleep(500000); // Wait for server start
  133. struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
  134. ASSERT_NE(ctx, nullptr);
  135. // Handshake
  136. int handshake_attempts = 200;
  137. int ret = 0;
  138. while (handshake_attempts-- > 0) {
  139. struct pollfd pfd = {client_sock, POLLIN, 0};
  140. poll(&pfd, 1, 10);
  141. ret = http2_ctx_handshake(ctx);
  142. if (ret == 1)
  143. break;
  144. if (ret < 0)
  145. break;
  146. }
  147. ASSERT_EQ(ret, 1) << "Client handshake failed";
  148. // Create stream
  149. struct http2_stream *stream = http2_stream_new(ctx);
  150. ASSERT_NE(stream, nullptr);
  151. // Send request
  152. struct http2_header_pair headers[] = {
  153. {"content-type", "application/json"}, {"content-length", "27"}, {NULL, NULL}};
  154. http2_stream_set_request(stream, "POST", "/echo", NULL, headers);
  155. const char *request_body = "{\"message\":\"Hello Echo!\"}";
  156. http2_stream_write_body(stream, (const uint8_t *)request_body, strlen(request_body), 1);
  157. // Wait for response
  158. int max_attempts = 200;
  159. while (max_attempts-- > 0) {
  160. struct pollfd pfd = {client_sock, POLLIN, 0};
  161. poll(&pfd, 1, 100);
  162. struct http2_poll_item items[10];
  163. int count = 0;
  164. http2_ctx_poll(ctx, items, 10, &count);
  165. if (http2_stream_get_status(stream) > 0)
  166. break;
  167. usleep(20000);
  168. }
  169. EXPECT_EQ(http2_stream_get_status(stream), 200);
  170. // Read response
  171. uint8_t response_body[4096];
  172. int response_body_len = 0;
  173. while (!http2_stream_is_end(stream) && response_body_len < (int)sizeof(response_body)) {
  174. int read_len = http2_stream_read_body(stream, response_body + response_body_len,
  175. sizeof(response_body) - response_body_len);
  176. if (read_len > 0) {
  177. response_body_len += read_len;
  178. } else {
  179. usleep(10000);
  180. }
  181. }
  182. std::string resp((char *)response_body, response_body_len);
  183. EXPECT_NE(resp.find("Echo Response"), std::string::npos);
  184. http2_stream_close(stream);
  185. http2_ctx_close(ctx);
  186. });
  187. server_thread.join();
  188. client_thread.join();
  189. }
  190. TEST_F(LIBHTTP2, MultiStream)
  191. {
  192. const int NUM_STREAMS = 3;
  193. std::thread server_thread([this, NUM_STREAMS]() {
  194. struct http2_ctx *ctx = http2_ctx_server_new("test-server", bio_read, bio_write, &server_sock, NULL);
  195. ASSERT_NE(ctx, nullptr);
  196. // Handshake
  197. int handshake_attempts = 200;
  198. int ret = 0;
  199. while (handshake_attempts-- > 0) {
  200. struct pollfd pfd = {server_sock, POLLIN, 0};
  201. poll(&pfd, 1, 10);
  202. ret = http2_ctx_handshake(ctx);
  203. if (ret == 1)
  204. break;
  205. if (ret < 0)
  206. break;
  207. }
  208. ASSERT_EQ(ret, 1) << "Server handshake failed";
  209. int streams_completed = 0;
  210. int max_iterations = 500;
  211. std::set<struct http2_stream *> processed_streams;
  212. while (streams_completed < NUM_STREAMS && max_iterations-- > 0) {
  213. struct pollfd pfd = {server_sock, POLLIN, 0};
  214. poll(&pfd, 1, 100);
  215. struct http2_poll_item items[10];
  216. int count = 0;
  217. http2_ctx_poll(ctx, items, 10, &count);
  218. for (int i = 0; i < count; i++) {
  219. if (items[i].stream == nullptr && items[i].readable) {
  220. struct http2_stream *s = http2_ctx_accept_stream(ctx);
  221. } else if (items[i].stream && items[i].readable) {
  222. struct http2_stream *stream = items[i].stream;
  223. uint8_t buf[1024];
  224. http2_stream_read_body(stream, buf, sizeof(buf));
  225. if (http2_stream_is_end(stream)) {
  226. if (processed_streams.find(stream) == processed_streams.end()) {
  227. char response[256];
  228. int response_len = snprintf(response, sizeof(response), "Echo from stream %d",
  229. http2_stream_get_id(stream));
  230. char content_length[32];
  231. snprintf(content_length, sizeof(content_length), "%d", response_len);
  232. struct http2_header_pair headers[] = {{"content-type", "text/plain"},
  233. {"content-length", content_length}};
  234. http2_stream_set_response(stream, 200, headers, 2);
  235. http2_stream_write_body(stream, (const uint8_t *)response, response_len, 1);
  236. streams_completed++;
  237. processed_streams.insert(stream);
  238. }
  239. }
  240. }
  241. }
  242. usleep(2000);
  243. }
  244. for (auto stream : processed_streams) {
  245. http2_stream_close(stream);
  246. }
  247. http2_ctx_close(ctx);
  248. });
  249. std::thread client_thread([this, NUM_STREAMS]() {
  250. usleep(50000);
  251. struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
  252. ASSERT_NE(ctx, nullptr);
  253. // Handshake
  254. int handshake_attempts = 200;
  255. int ret = 0;
  256. while (handshake_attempts-- > 0) {
  257. struct pollfd pfd = {client_sock, POLLIN, 0};
  258. poll(&pfd, 1, 10);
  259. ret = http2_ctx_handshake(ctx);
  260. if (ret == 1)
  261. break;
  262. if (ret < 0)
  263. break;
  264. }
  265. ASSERT_EQ(ret, 1) << "Client handshake failed";
  266. struct http2_stream *streams[NUM_STREAMS];
  267. for (int i = 0; i < NUM_STREAMS; i++) {
  268. streams[i] = http2_stream_new(ctx);
  269. ASSERT_NE(streams[i], nullptr);
  270. char path[64];
  271. snprintf(path, sizeof(path), "/stream%d", i);
  272. char body[128];
  273. int body_len = snprintf(body, sizeof(body), "Request from stream %d", i);
  274. char content_length[32];
  275. snprintf(content_length, sizeof(content_length), "%d", body_len);
  276. struct http2_header_pair headers[] = {
  277. {"content-type", "text/plain"}, {"content-length", content_length}, {NULL, NULL}};
  278. http2_stream_set_request(streams[i], "POST", path, NULL, headers);
  279. http2_stream_write_body(streams[i], (const uint8_t *)body, body_len, 1);
  280. }
  281. int streams_completed = 0;
  282. int max_iterations = 500;
  283. std::set<int> completed_stream_ids;
  284. while (streams_completed < NUM_STREAMS && max_iterations-- > 0) {
  285. struct pollfd pfd = {client_sock, POLLIN, 0};
  286. poll(&pfd, 1, 100);
  287. struct http2_poll_item items[10];
  288. int count = 0;
  289. http2_ctx_poll(ctx, items, 10, &count);
  290. for (int i = 0; i < count; i++) {
  291. if (items[i].stream && items[i].readable) {
  292. struct http2_stream *stream = items[i].stream;
  293. uint8_t buf[1024];
  294. http2_stream_read_body(stream, buf, sizeof(buf));
  295. if (http2_stream_is_end(stream)) {
  296. int stream_id = http2_stream_get_id(stream);
  297. if (completed_stream_ids.find(stream_id) == completed_stream_ids.end()) {
  298. completed_stream_ids.insert(stream_id);
  299. streams_completed++;
  300. }
  301. }
  302. }
  303. }
  304. usleep(2000);
  305. }
  306. EXPECT_EQ(streams_completed, NUM_STREAMS);
  307. for (int i = 0; i < NUM_STREAMS; i++) {
  308. http2_stream_close(streams[i]);
  309. }
  310. http2_ctx_close(ctx);
  311. });
  312. server_thread.join();
  313. client_thread.join();
  314. }
  315. TEST_F(LIBHTTP2, EarlyStreamCreation)
  316. {
  317. std::thread server_thread([this]() {
  318. // Server logic
  319. struct http2_ctx *ctx = http2_ctx_server_new("test-server", bio_read, bio_write, &server_sock, NULL);
  320. ASSERT_NE(ctx, nullptr);
  321. // Handshake
  322. int handshake_attempts = 200;
  323. int ret = 0;
  324. while (handshake_attempts-- > 0) {
  325. struct pollfd pfd = {server_sock, POLLIN, 0};
  326. int poll_ret = poll(&pfd, 1, 10);
  327. if (poll_ret == 0) {
  328. continue;
  329. }
  330. ret = http2_ctx_handshake(ctx);
  331. if (ret == 1)
  332. break;
  333. if (ret < 0)
  334. break;
  335. }
  336. ASSERT_EQ(ret, 1) << "Server handshake failed";
  337. // Accept stream
  338. struct http2_stream *stream = nullptr;
  339. int max_attempts = 200;
  340. while (max_attempts-- > 0 && !stream) {
  341. struct pollfd pfd = {server_sock, POLLIN, 0};
  342. poll(&pfd, 1, 100);
  343. struct http2_poll_item items[10];
  344. int count = 0;
  345. http2_ctx_poll(ctx, items, 10, &count);
  346. for (int i = 0; i < count; i++) {
  347. if (items[i].stream == nullptr && items[i].readable) {
  348. stream = http2_ctx_accept_stream(ctx);
  349. if (stream)
  350. break;
  351. }
  352. }
  353. usleep(20000);
  354. }
  355. ASSERT_NE(stream, nullptr) << "Server failed to accept stream";
  356. // Verify we received the request
  357. const char *method = http2_stream_get_method(stream);
  358. const char *path = http2_stream_get_path(stream);
  359. EXPECT_STREQ(method, "POST");
  360. EXPECT_STREQ(path, "/early-test");
  361. // Read request body (should be empty for GET)
  362. uint8_t request_body[4096];
  363. int request_body_len = 0;
  364. while (!http2_stream_is_end(stream) && request_body_len < (int)sizeof(request_body)) {
  365. int read_len = http2_stream_read_body(stream, request_body + request_body_len,
  366. sizeof(request_body) - request_body_len);
  367. if (read_len > 0) {
  368. request_body_len += read_len;
  369. } else {
  370. usleep(10000);
  371. }
  372. }
  373. // Send response
  374. char response[8192];
  375. int response_len = snprintf(response, sizeof(response), "Echo Response: %.*s", request_body_len, request_body);
  376. char content_length[32];
  377. snprintf(content_length, sizeof(content_length), "%d", response_len);
  378. struct http2_header_pair headers[] = {
  379. {"content-type", "text/plain"}, {"content-length", content_length}, {NULL, NULL}};
  380. http2_stream_set_response(stream, 200, headers, 2);
  381. http2_stream_write_body(stream, (const uint8_t *)response, response_len, 1);
  382. http2_stream_close(stream);
  383. http2_ctx_close(ctx);
  384. });
  385. std::thread client_thread([this]() {
  386. usleep(50000); // Wait for server start
  387. // Create client context
  388. struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
  389. ASSERT_NE(ctx, nullptr);
  390. // IMPORTANT: Create stream and send request BEFORE handshake completes
  391. // This tests that the HEADERS frame is buffered and sent after handshake
  392. struct http2_stream *stream = http2_stream_new(ctx);
  393. ASSERT_NE(stream, nullptr);
  394. // Send request immediately (before handshake)
  395. struct http2_header_pair headers[] = {{"user-agent", "test-client"}, {NULL, NULL}};
  396. int ret = http2_stream_set_request(stream, "POST", "/early-test", NULL, headers);
  397. EXPECT_EQ(ret, 0) << "Failed to set request";
  398. const char *request_body = "test echo";
  399. http2_stream_write_body(stream, (const uint8_t *)request_body, strlen(request_body), 1);
  400. // Now complete handshake
  401. int handshake_attempts = 200;
  402. ret = 0;
  403. while (handshake_attempts-- > 0) {
  404. struct pollfd pfd = {client_sock, POLLIN, 0};
  405. poll(&pfd, 1, 10);
  406. ret = http2_ctx_handshake(ctx);
  407. if (ret == 1)
  408. break;
  409. if (ret < 0)
  410. break;
  411. }
  412. ASSERT_EQ(ret, 1) << "Client handshake failed";
  413. // Wait for response
  414. int max_attempts = 200;
  415. while (max_attempts-- > 0) {
  416. struct pollfd pfd = {client_sock, POLLIN, 0};
  417. poll(&pfd, 1, 100);
  418. struct http2_poll_item items[10];
  419. int count = 0;
  420. http2_ctx_poll(ctx, items, 10, &count);
  421. if (http2_stream_get_status(stream) > 0)
  422. break;
  423. usleep(20000);
  424. }
  425. EXPECT_EQ(http2_stream_get_status(stream), 200);
  426. // Read response
  427. uint8_t response_body[4096];
  428. int response_body_len = 0;
  429. while (!http2_stream_is_end(stream) && response_body_len < (int)sizeof(response_body)) {
  430. int read_len = http2_stream_read_body(stream, response_body + response_body_len,
  431. sizeof(response_body) - response_body_len);
  432. if (read_len > 0) {
  433. response_body_len += read_len;
  434. } else {
  435. usleep(10000);
  436. }
  437. }
  438. std::string resp((char *)response_body, response_body_len);
  439. EXPECT_NE(resp.find("Echo Response"), std::string::npos);
  440. EXPECT_NE(resp.find("test echo"), std::string::npos);
  441. http2_stream_close(stream);
  442. http2_ctx_close(ctx);
  443. });
  444. server_thread.join();
  445. client_thread.join();
  446. }
  447. TEST_F(LIBHTTP2, ServerLoopTerminationOnDisconnect)
  448. {
  449. std::thread server_thread([this]() {
  450. struct http2_ctx *ctx = http2_ctx_server_new("test-server", bio_read, bio_write, &server_sock, NULL);
  451. ASSERT_NE(ctx, nullptr);
  452. // Handshake
  453. int handshake_attempts = 200;
  454. int ret = 0;
  455. while (handshake_attempts-- > 0) {
  456. struct pollfd pfd = {server_sock, POLLIN, 0};
  457. int poll_ret = poll(&pfd, 1, 10);
  458. if (poll_ret == 0) {
  459. continue;
  460. }
  461. ret = http2_ctx_handshake(ctx);
  462. if (ret == 1)
  463. break;
  464. if (ret < 0)
  465. break;
  466. }
  467. ASSERT_EQ(ret, 1) << "Server handshake failed";
  468. // Accept stream
  469. struct http2_stream *stream = nullptr;
  470. int max_attempts = 200;
  471. while (max_attempts-- > 0 && !stream) {
  472. struct pollfd pfd = {server_sock, POLLIN, 0};
  473. poll(&pfd, 1, 100);
  474. struct http2_poll_item items[10];
  475. int count = 0;
  476. http2_ctx_poll(ctx, items, 10, &count);
  477. for (int i = 0; i < count; i++) {
  478. if (items[i].stream == nullptr && items[i].readable) {
  479. stream = http2_ctx_accept_stream(ctx);
  480. if (stream)
  481. break;
  482. }
  483. }
  484. usleep(20000);
  485. }
  486. ASSERT_NE(stream, nullptr) << "Server failed to accept stream";
  487. // Read request body until EOF
  488. uint8_t buf[1024];
  489. int loop_count = 0;
  490. while (loop_count++ < 100) {
  491. struct http2_poll_item items[10];
  492. int count = 0;
  493. http2_ctx_poll(ctx, items, 10, &count);
  494. int data_read = 0;
  495. for (int i = 0; i < count; i++) {
  496. if (items[i].stream == stream && items[i].readable) {
  497. int ret = http2_stream_read_body(stream, buf, sizeof(buf));
  498. if (ret > 0) {
  499. data_read = 1;
  500. } else if (ret == 0) {
  501. // EOF received
  502. data_read = 1;
  503. }
  504. }
  505. }
  506. if (!data_read && http2_stream_is_end(stream)) {
  507. // If we are here, it means poll returned 0 items (or stream not readable),
  508. // which is correct behavior after EOF is consumed.
  509. // If the bug exists, poll would keep returning readable stream, and we would keep reading 0 bytes.
  510. break;
  511. }
  512. usleep(10000);
  513. }
  514. EXPECT_LT(loop_count, 100) << "Server loop did not terminate (infinite loop detected)";
  515. http2_ctx_close(ctx);
  516. });
  517. std::thread client_thread([this]() {
  518. usleep(50000);
  519. struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
  520. ASSERT_NE(ctx, nullptr);
  521. int handshake_attempts = 200;
  522. int ret = 0;
  523. while (handshake_attempts-- > 0) {
  524. struct pollfd pfd = {client_sock, POLLIN, 0};
  525. poll(&pfd, 1, 10);
  526. ret = http2_ctx_handshake(ctx);
  527. if (ret != 0) {
  528. break;
  529. }
  530. }
  531. ASSERT_EQ(ret, 1);
  532. struct http2_stream *stream = http2_stream_new(ctx);
  533. ASSERT_NE(stream, nullptr);
  534. struct http2_header_pair headers[] = {{"content-type", "text/plain"}, {NULL, NULL}};
  535. http2_stream_set_request(stream, "POST", "/test", NULL, headers);
  536. http2_stream_write_body(stream, (const uint8_t *)"test", 4, 1);
  537. http2_stream_close(stream);
  538. http2_ctx_close(ctx);
  539. });
  540. server_thread.join();
  541. client_thread.join();
  542. }
  543. TEST_F(LIBHTTP2, StreamClose)
  544. {
  545. std::thread server_thread([this]() {
  546. struct http2_ctx *ctx = http2_ctx_server_new("test-server", bio_read, bio_write, &server_sock, NULL);
  547. ASSERT_NE(ctx, nullptr);
  548. // Handshake
  549. int handshake_attempts = 200;
  550. int ret = 0;
  551. while (handshake_attempts-- > 0) {
  552. struct pollfd pfd = {server_sock, POLLIN, 0};
  553. int poll_ret = poll(&pfd, 1, 10);
  554. if (poll_ret == 0) {
  555. continue;
  556. }
  557. ret = http2_ctx_handshake(ctx);
  558. if (ret == 1)
  559. break;
  560. if (ret < 0)
  561. break;
  562. }
  563. ASSERT_EQ(ret, 1) << "Server handshake failed";
  564. // Accept stream
  565. struct http2_stream *stream = nullptr;
  566. int max_attempts = 200;
  567. while (max_attempts-- > 0 && !stream) {
  568. struct pollfd pfd = {server_sock, POLLIN, 0};
  569. poll(&pfd, 1, 100);
  570. struct http2_poll_item items[10];
  571. int count = 0;
  572. http2_ctx_poll(ctx, items, 10, &count);
  573. for (int i = 0; i < count; i++) {
  574. if (items[i].stream == nullptr && items[i].readable) {
  575. stream = http2_ctx_accept_stream(ctx);
  576. if (stream)
  577. break;
  578. }
  579. }
  580. usleep(20000);
  581. }
  582. ASSERT_NE(stream, nullptr) << "Server failed to accept stream";
  583. // Read request and send response
  584. uint8_t buf[1024];
  585. http2_stream_read_body(stream, buf, sizeof(buf));
  586. http2_stream_set_response(stream, 200, NULL, 0);
  587. http2_stream_write_body(stream, (const uint8_t *)"OK", 2, 1);
  588. http2_stream_close(stream);
  589. http2_ctx_close(ctx);
  590. });
  591. std::thread client_thread([this]() {
  592. usleep(50000);
  593. struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
  594. ASSERT_NE(ctx, nullptr);
  595. // Handshake
  596. int handshake_attempts = 200;
  597. int ret = 0;
  598. while (handshake_attempts-- > 0) {
  599. struct pollfd pfd = {client_sock, POLLIN, 0};
  600. poll(&pfd, 1, 10);
  601. ret = http2_ctx_handshake(ctx);
  602. if (ret == 1)
  603. break;
  604. if (ret < 0)
  605. break;
  606. }
  607. ASSERT_EQ(ret, 1) << "Client handshake failed";
  608. // Create stream
  609. struct http2_stream *stream = http2_stream_new(ctx);
  610. ASSERT_NE(stream, nullptr);
  611. // Send request
  612. http2_stream_set_request(stream, "GET", "/test", NULL, NULL);
  613. http2_stream_write_body(stream, NULL, 0, 1);
  614. // Wait for response
  615. int max_attempts = 200;
  616. while (max_attempts-- > 0) {
  617. struct pollfd pfd = {client_sock, POLLIN, 0};
  618. poll(&pfd, 1, 100);
  619. struct http2_poll_item items[10];
  620. int count = 0;
  621. http2_ctx_poll(ctx, items, 10, &count);
  622. if (http2_stream_get_status(stream) > 0)
  623. break;
  624. usleep(20000);
  625. }
  626. // Close the stream explicitly
  627. http2_stream_get(stream); // Keep reference for reading after close
  628. http2_stream_close(stream);
  629. // Verify stream is marked as closed (should still be able to read)
  630. // After close, the stream should still be readable until all data is consumed
  631. EXPECT_FALSE(http2_stream_is_end(stream)); // Should not be end yet since we haven't read response
  632. // Read response (should still work after close)
  633. uint8_t buf[1024];
  634. int read_len = http2_stream_read_body(stream, buf, sizeof(buf));
  635. EXPECT_GE(read_len, 0); // Should be able to read
  636. // After reading all data, stream should be end
  637. while (!http2_stream_is_end(stream)) {
  638. read_len = http2_stream_read_body(stream, buf, sizeof(buf));
  639. if (read_len <= 0) {
  640. break;
  641. }
  642. }
  643. EXPECT_TRUE(http2_stream_is_end(stream)); // Should be end after reading all data
  644. http2_stream_put(stream);
  645. http2_ctx_put(ctx);
  646. });
  647. server_thread.join();
  648. client_thread.join();
  649. }
  650. TEST_F(LIBHTTP2, ReferenceCountingNormal)
  651. {
  652. // Test normal reference counting: ctx normal, stream released by business
  653. struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
  654. ASSERT_NE(ctx, nullptr);
  655. // Create a stream (already has refcount = 1)
  656. struct http2_stream *stream = http2_stream_new(ctx);
  657. ASSERT_NE(stream, nullptr);
  658. // Close context (should not free stream because business still holds reference)
  659. http2_ctx_close(ctx);
  660. // Business releases reference
  661. http2_stream_close(stream);
  662. // Now stream should be freed
  663. // We can't directly check, but no crash should occur
  664. }
  665. TEST_F(LIBHTTP2, ReferenceCountingContextError)
  666. {
  667. // Test reference counting when ctx has error but stream is still referenced by business
  668. struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
  669. ASSERT_NE(ctx, nullptr);
  670. // Create a stream
  671. struct http2_stream *stream = http2_stream_new(ctx);
  672. ASSERT_NE(stream, nullptr);
  673. // Simulate context error by closing the socket (connection broken)
  674. close(client_sock);
  675. client_sock = -1;
  676. // Close context (should handle error gracefully)
  677. http2_ctx_close(ctx);
  678. // Business still holds reference, should be able to release it
  679. http2_stream_close(stream);
  680. // No crash should occur
  681. }
  682. TEST_F(LIBHTTP2, StressTest)
  683. {
  684. const int NUM_STREAMS = 1024;
  685. std::atomic<int> server_processed(0);
  686. std::atomic<int> client_completed(0);
  687. std::atomic<bool> test_completed(false);
  688. std::thread server_thread([this, NUM_STREAMS, &server_processed, &test_completed]() {
  689. struct http2_ctx *ctx = http2_ctx_server_new("test-server", bio_read, bio_write, &server_sock, NULL);
  690. ASSERT_NE(ctx, nullptr);
  691. // Handshake
  692. auto start_time = std::chrono::steady_clock::now();
  693. int ret = 0;
  694. while (std::chrono::steady_clock::now() - start_time < std::chrono::seconds(5)) {
  695. struct pollfd pfd = {server_sock, POLLIN, 0};
  696. int poll_ret = poll(&pfd, 1, 10);
  697. if (poll_ret == 0) {
  698. continue;
  699. }
  700. ret = http2_ctx_handshake(ctx);
  701. if (ret == 1)
  702. break;
  703. if (ret < 0)
  704. break;
  705. }
  706. ASSERT_EQ(ret, 1) << "Server handshake failed";
  707. std::vector<struct http2_stream *> streams;
  708. start_time = std::chrono::steady_clock::now();
  709. while (!test_completed && std::chrono::steady_clock::now() - start_time < std::chrono::seconds(30)) {
  710. struct pollfd pfd = {server_sock, POLLIN, 0};
  711. poll(&pfd, 1, 10);
  712. struct http2_poll_item items[64];
  713. int count = 0;
  714. http2_ctx_poll(ctx, items, 64, &count);
  715. for (int i = 0; i < count; i++) {
  716. if (items[i].stream == nullptr && items[i].readable) {
  717. struct http2_stream *stream = http2_ctx_accept_stream(ctx);
  718. if (stream) {
  719. streams.push_back(stream);
  720. }
  721. } else if (items[i].stream && items[i].readable) {
  722. struct http2_stream *stream = items[i].stream;
  723. uint8_t buf[1024];
  724. while (http2_stream_read_body(stream, buf, sizeof(buf)) > 0)
  725. ;
  726. if (http2_stream_is_end(stream)) {
  727. char response[256];
  728. int response_len = snprintf(response, sizeof(response), "Echo %d", http2_stream_get_id(stream));
  729. char content_length[32];
  730. snprintf(content_length, sizeof(content_length), "%d", response_len);
  731. struct http2_header_pair headers[] = {{"content-type", "text/plain"},
  732. {"content-length", content_length}};
  733. http2_stream_set_response(stream, 200, headers, 2);
  734. http2_stream_write_body(stream, (const uint8_t *)response, response_len, 1);
  735. server_processed++;
  736. }
  737. }
  738. }
  739. }
  740. for (auto stream : streams) {
  741. http2_stream_close(stream);
  742. }
  743. http2_ctx_close(ctx);
  744. });
  745. std::thread client_thread([this, NUM_STREAMS, &client_completed, &test_completed]() {
  746. usleep(50000);
  747. struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
  748. ASSERT_NE(ctx, nullptr);
  749. // Handshake
  750. auto start_time = std::chrono::steady_clock::now();
  751. int ret = 0;
  752. while (std::chrono::steady_clock::now() - start_time < std::chrono::seconds(5)) {
  753. struct pollfd pfd = {client_sock, POLLIN, 0};
  754. poll(&pfd, 1, 10);
  755. ret = http2_ctx_handshake(ctx);
  756. if (ret == 1)
  757. break;
  758. if (ret < 0)
  759. break;
  760. }
  761. ASSERT_EQ(ret, 1) << "Client handshake failed";
  762. std::vector<struct http2_stream *> streams;
  763. streams.reserve(NUM_STREAMS);
  764. std::set<int> completed_ids;
  765. auto process_events = [&](int timeout_ms) {
  766. struct pollfd pfd = {client_sock, POLLIN, 0};
  767. poll(&pfd, 1, timeout_ms);
  768. struct http2_poll_item items[64];
  769. int count = 0;
  770. http2_ctx_poll(ctx, items, 64, &count);
  771. for (int i = 0; i < count; i++) {
  772. if (items[i].stream && items[i].readable) {
  773. struct http2_stream *stream = items[i].stream;
  774. uint8_t buf[1024];
  775. while (http2_stream_read_body(stream, buf, sizeof(buf)) > 0)
  776. ;
  777. if (http2_stream_is_end(stream)) {
  778. int id = http2_stream_get_id(stream);
  779. if (completed_ids.find(id) == completed_ids.end()) {
  780. completed_ids.insert(id);
  781. client_completed++;
  782. }
  783. }
  784. }
  785. }
  786. };
  787. for (int i = 0; i < NUM_STREAMS; i++) {
  788. struct http2_stream *stream = http2_stream_new(ctx);
  789. if (stream) {
  790. streams.push_back(stream);
  791. char path[64];
  792. snprintf(path, sizeof(path), "/stream%d", i);
  793. char body[64];
  794. int body_len = snprintf(body, sizeof(body), "Req %d", i);
  795. struct http2_header_pair headers[] = {{"content-type", "text/plain"}, {NULL, NULL}};
  796. http2_stream_set_request(stream, "POST", path, NULL, headers);
  797. http2_stream_write_body(stream, (const uint8_t *)body, body_len, 1);
  798. }
  799. // Process events periodically to prevent deadlock/buffer overflow
  800. if (i % 10 == 0) {
  801. process_events(0);
  802. }
  803. }
  804. ASSERT_EQ(streams.size(), NUM_STREAMS);
  805. start_time = std::chrono::steady_clock::now();
  806. while (client_completed < NUM_STREAMS &&
  807. std::chrono::steady_clock::now() - start_time < std::chrono::seconds(30)) {
  808. process_events(10);
  809. }
  810. EXPECT_EQ(client_completed, NUM_STREAMS);
  811. for (auto stream : streams) {
  812. http2_stream_close(stream);
  813. }
  814. http2_ctx_close(ctx);
  815. test_completed = true;
  816. });
  817. server_thread.join();
  818. client_thread.join();
  819. }