bufq.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. /***************************************************************************
  2. * _ _ ____ _
  3. * Project ___| | | | _ \| |
  4. * / __| | | | |_) | |
  5. * | (__| |_| | _ <| |___
  6. * \___|\___/|_| \_\_____|
  7. *
  8. * Copyright (C) Daniel Stenberg, <[email protected]>, et al.
  9. *
  10. * This software is licensed as described in the file COPYING, which
  11. * you should have received as part of this distribution. The terms
  12. * are also available at https://curl.se/docs/copyright.html.
  13. *
  14. * You may opt to use, copy, modify, merge, publish, distribute and/or sell
  15. * copies of the Software, and permit persons to whom the Software is
  16. * furnished to do so, under the terms of the COPYING file.
  17. *
  18. * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
  19. * KIND, either express or implied.
  20. *
  21. * SPDX-License-Identifier: curl
  22. *
  23. ***************************************************************************/
  24. #include "curl_setup.h"
  25. #include "bufq.h"
  26. /* The last 3 #include files should be in this order */
  27. #include "curl_printf.h"
  28. #include "curl_memory.h"
  29. #include "memdebug.h"
  30. static bool chunk_is_empty(const struct buf_chunk *chunk)
  31. {
  32. return chunk->r_offset >= chunk->w_offset;
  33. }
  34. static bool chunk_is_full(const struct buf_chunk *chunk)
  35. {
  36. return chunk->w_offset >= chunk->dlen;
  37. }
  38. static size_t chunk_len(const struct buf_chunk *chunk)
  39. {
  40. return chunk->w_offset - chunk->r_offset;
  41. }
  42. static void chunk_reset(struct buf_chunk *chunk)
  43. {
  44. chunk->next = NULL;
  45. chunk->r_offset = chunk->w_offset = 0;
  46. }
  47. static size_t chunk_append(struct buf_chunk *chunk,
  48. const unsigned char *buf, size_t len)
  49. {
  50. unsigned char *p = &chunk->x.data[chunk->w_offset];
  51. size_t n = chunk->dlen - chunk->w_offset;
  52. DEBUGASSERT(chunk->dlen >= chunk->w_offset);
  53. if(n) {
  54. n = CURLMIN(n, len);
  55. memcpy(p, buf, n);
  56. chunk->w_offset += n;
  57. }
  58. return n;
  59. }
  60. static size_t chunk_read(struct buf_chunk *chunk,
  61. unsigned char *buf, size_t len)
  62. {
  63. unsigned char *p = &chunk->x.data[chunk->r_offset];
  64. size_t n = chunk->w_offset - chunk->r_offset;
  65. DEBUGASSERT(chunk->w_offset >= chunk->r_offset);
  66. if(!n) {
  67. return 0;
  68. }
  69. else if(n <= len) {
  70. memcpy(buf, p, n);
  71. chunk->r_offset = chunk->w_offset = 0;
  72. return n;
  73. }
  74. else {
  75. memcpy(buf, p, len);
  76. chunk->r_offset += len;
  77. return len;
  78. }
  79. }
  80. static CURLcode chunk_slurpn(struct buf_chunk *chunk, size_t max_len,
  81. Curl_bufq_reader *reader,
  82. void *reader_ctx, size_t *pnread)
  83. {
  84. unsigned char *p = &chunk->x.data[chunk->w_offset];
  85. size_t n = chunk->dlen - chunk->w_offset; /* free amount */
  86. CURLcode result;
  87. *pnread = 0;
  88. DEBUGASSERT(chunk->dlen >= chunk->w_offset);
  89. if(!n)
  90. return CURLE_AGAIN;
  91. if(max_len && n > max_len)
  92. n = max_len;
  93. result = reader(reader_ctx, p, n, pnread);
  94. if(!result) {
  95. DEBUGASSERT(*pnread <= n);
  96. chunk->w_offset += *pnread;
  97. }
  98. return result;
  99. }
  100. static void chunk_peek(const struct buf_chunk *chunk,
  101. const unsigned char **pbuf, size_t *plen)
  102. {
  103. DEBUGASSERT(chunk->w_offset >= chunk->r_offset);
  104. *pbuf = &chunk->x.data[chunk->r_offset];
  105. *plen = chunk->w_offset - chunk->r_offset;
  106. }
  107. static void chunk_peek_at(const struct buf_chunk *chunk, size_t offset,
  108. const unsigned char **pbuf, size_t *plen)
  109. {
  110. offset += chunk->r_offset;
  111. DEBUGASSERT(chunk->w_offset >= offset);
  112. *pbuf = &chunk->x.data[offset];
  113. *plen = chunk->w_offset - offset;
  114. }
  115. static size_t chunk_skip(struct buf_chunk *chunk, size_t amount)
  116. {
  117. size_t n = chunk->w_offset - chunk->r_offset;
  118. DEBUGASSERT(chunk->w_offset >= chunk->r_offset);
  119. if(n) {
  120. n = CURLMIN(n, amount);
  121. chunk->r_offset += n;
  122. if(chunk->r_offset == chunk->w_offset)
  123. chunk->r_offset = chunk->w_offset = 0;
  124. }
  125. return n;
  126. }
  127. static void chunk_list_free(struct buf_chunk **anchor)
  128. {
  129. struct buf_chunk *chunk;
  130. while(*anchor) {
  131. chunk = *anchor;
  132. *anchor = chunk->next;
  133. free(chunk);
  134. }
  135. }
  136. void Curl_bufcp_init(struct bufc_pool *pool,
  137. size_t chunk_size, size_t spare_max)
  138. {
  139. DEBUGASSERT(chunk_size > 0);
  140. DEBUGASSERT(spare_max > 0);
  141. memset(pool, 0, sizeof(*pool));
  142. pool->chunk_size = chunk_size;
  143. pool->spare_max = spare_max;
  144. }
  145. static CURLcode bufcp_take(struct bufc_pool *pool,
  146. struct buf_chunk **pchunk)
  147. {
  148. struct buf_chunk *chunk = NULL;
  149. if(pool->spare) {
  150. chunk = pool->spare;
  151. pool->spare = chunk->next;
  152. --pool->spare_count;
  153. chunk_reset(chunk);
  154. *pchunk = chunk;
  155. return CURLE_OK;
  156. }
  157. chunk = calloc(1, sizeof(*chunk) + pool->chunk_size);
  158. if(!chunk) {
  159. *pchunk = NULL;
  160. return CURLE_OUT_OF_MEMORY;
  161. }
  162. chunk->dlen = pool->chunk_size;
  163. *pchunk = chunk;
  164. return CURLE_OK;
  165. }
  166. static void bufcp_put(struct bufc_pool *pool,
  167. struct buf_chunk *chunk)
  168. {
  169. if(pool->spare_count >= pool->spare_max) {
  170. free(chunk);
  171. }
  172. else {
  173. chunk_reset(chunk);
  174. chunk->next = pool->spare;
  175. pool->spare = chunk;
  176. ++pool->spare_count;
  177. }
  178. }
  179. void Curl_bufcp_free(struct bufc_pool *pool)
  180. {
  181. chunk_list_free(&pool->spare);
  182. pool->spare_count = 0;
  183. }
  184. static void bufq_init(struct bufq *q, struct bufc_pool *pool,
  185. size_t chunk_size, size_t max_chunks, int opts)
  186. {
  187. DEBUGASSERT(chunk_size > 0);
  188. DEBUGASSERT(max_chunks > 0);
  189. memset(q, 0, sizeof(*q));
  190. q->chunk_size = chunk_size;
  191. q->max_chunks = max_chunks;
  192. q->pool = pool;
  193. q->opts = opts;
  194. }
  195. void Curl_bufq_init2(struct bufq *q, size_t chunk_size, size_t max_chunks,
  196. int opts)
  197. {
  198. bufq_init(q, NULL, chunk_size, max_chunks, opts);
  199. }
  200. void Curl_bufq_init(struct bufq *q, size_t chunk_size, size_t max_chunks)
  201. {
  202. bufq_init(q, NULL, chunk_size, max_chunks, BUFQ_OPT_NONE);
  203. }
  204. void Curl_bufq_initp(struct bufq *q, struct bufc_pool *pool,
  205. size_t max_chunks, int opts)
  206. {
  207. bufq_init(q, pool, pool->chunk_size, max_chunks, opts);
  208. }
  209. void Curl_bufq_free(struct bufq *q)
  210. {
  211. chunk_list_free(&q->head);
  212. chunk_list_free(&q->spare);
  213. q->tail = NULL;
  214. q->chunk_count = 0;
  215. }
  216. void Curl_bufq_reset(struct bufq *q)
  217. {
  218. struct buf_chunk *chunk;
  219. while(q->head) {
  220. chunk = q->head;
  221. q->head = chunk->next;
  222. chunk->next = q->spare;
  223. q->spare = chunk;
  224. }
  225. q->tail = NULL;
  226. }
  227. size_t Curl_bufq_len(const struct bufq *q)
  228. {
  229. const struct buf_chunk *chunk = q->head;
  230. size_t len = 0;
  231. while(chunk) {
  232. len += chunk_len(chunk);
  233. chunk = chunk->next;
  234. }
  235. return len;
  236. }
  237. bool Curl_bufq_is_empty(const struct bufq *q)
  238. {
  239. return !q->head || chunk_is_empty(q->head);
  240. }
  241. bool Curl_bufq_is_full(const struct bufq *q)
  242. {
  243. if(!q->tail || q->spare)
  244. return FALSE;
  245. if(q->chunk_count < q->max_chunks)
  246. return FALSE;
  247. if(q->chunk_count > q->max_chunks)
  248. return TRUE;
  249. /* we have no spares and cannot make more, is the tail full? */
  250. return chunk_is_full(q->tail);
  251. }
  252. static struct buf_chunk *get_spare(struct bufq *q)
  253. {
  254. struct buf_chunk *chunk = NULL;
  255. if(q->spare) {
  256. chunk = q->spare;
  257. q->spare = chunk->next;
  258. chunk_reset(chunk);
  259. return chunk;
  260. }
  261. if(q->chunk_count >= q->max_chunks && (!(q->opts & BUFQ_OPT_SOFT_LIMIT)))
  262. return NULL;
  263. if(q->pool) {
  264. if(bufcp_take(q->pool, &chunk))
  265. return NULL;
  266. ++q->chunk_count;
  267. return chunk;
  268. }
  269. else {
  270. chunk = calloc(1, sizeof(*chunk) + q->chunk_size);
  271. if(!chunk)
  272. return NULL;
  273. chunk->dlen = q->chunk_size;
  274. ++q->chunk_count;
  275. return chunk;
  276. }
  277. }
  278. static void prune_head(struct bufq *q)
  279. {
  280. struct buf_chunk *chunk;
  281. while(q->head && chunk_is_empty(q->head)) {
  282. chunk = q->head;
  283. q->head = chunk->next;
  284. if(q->tail == chunk)
  285. q->tail = q->head;
  286. if(q->pool) {
  287. bufcp_put(q->pool, chunk);
  288. --q->chunk_count;
  289. }
  290. else if((q->chunk_count > q->max_chunks) ||
  291. (q->opts & BUFQ_OPT_NO_SPARES)) {
  292. /* SOFT_LIMIT allowed us more than max. free spares until
  293. * we are at max again. Or free them if we are configured
  294. * to not use spares. */
  295. free(chunk);
  296. --q->chunk_count;
  297. }
  298. else {
  299. chunk->next = q->spare;
  300. q->spare = chunk;
  301. }
  302. }
  303. }
  304. static struct buf_chunk *get_non_full_tail(struct bufq *q)
  305. {
  306. struct buf_chunk *chunk;
  307. if(q->tail && !chunk_is_full(q->tail))
  308. return q->tail;
  309. chunk = get_spare(q);
  310. if(chunk) {
  311. /* new tail, and possibly new head */
  312. if(q->tail) {
  313. q->tail->next = chunk;
  314. q->tail = chunk;
  315. }
  316. else {
  317. DEBUGASSERT(!q->head);
  318. q->head = q->tail = chunk;
  319. }
  320. }
  321. return chunk;
  322. }
  323. CURLcode Curl_bufq_write(struct bufq *q,
  324. const unsigned char *buf, size_t len,
  325. size_t *pnwritten)
  326. {
  327. struct buf_chunk *tail;
  328. size_t n;
  329. DEBUGASSERT(q->max_chunks > 0);
  330. *pnwritten = 0;
  331. while(len) {
  332. tail = get_non_full_tail(q);
  333. if(!tail) {
  334. if((q->chunk_count < q->max_chunks) || (q->opts & BUFQ_OPT_SOFT_LIMIT))
  335. /* should have gotten a tail, but did not */
  336. return CURLE_OUT_OF_MEMORY;
  337. break;
  338. }
  339. n = chunk_append(tail, buf, len);
  340. if(!n)
  341. break;
  342. *pnwritten += n;
  343. buf += n;
  344. len -= n;
  345. }
  346. return (!*pnwritten && len) ? CURLE_AGAIN : CURLE_OK;
  347. }
  348. CURLcode Curl_bufq_cwrite(struct bufq *q,
  349. const char *buf, size_t len,
  350. size_t *pnwritten)
  351. {
  352. return Curl_bufq_write(q, (const unsigned char *)buf, len, pnwritten);
  353. }
  354. CURLcode Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
  355. size_t *pnread)
  356. {
  357. *pnread = 0;
  358. while(len && q->head) {
  359. size_t n = chunk_read(q->head, buf, len);
  360. if(n) {
  361. *pnread += n;
  362. buf += n;
  363. len -= n;
  364. }
  365. prune_head(q);
  366. }
  367. return (!*pnread) ? CURLE_AGAIN : CURLE_OK;
  368. }
  369. CURLcode Curl_bufq_cread(struct bufq *q, char *buf, size_t len,
  370. size_t *pnread)
  371. {
  372. return Curl_bufq_read(q, (unsigned char *)buf, len, pnread);
  373. }
  374. bool Curl_bufq_peek(struct bufq *q,
  375. const unsigned char **pbuf, size_t *plen)
  376. {
  377. if(q->head && chunk_is_empty(q->head)) {
  378. prune_head(q);
  379. }
  380. if(q->head && !chunk_is_empty(q->head)) {
  381. chunk_peek(q->head, pbuf, plen);
  382. return TRUE;
  383. }
  384. *pbuf = NULL;
  385. *plen = 0;
  386. return FALSE;
  387. }
  388. bool Curl_bufq_peek_at(struct bufq *q, size_t offset,
  389. const unsigned char **pbuf, size_t *plen)
  390. {
  391. struct buf_chunk *c = q->head;
  392. size_t clen;
  393. while(c) {
  394. clen = chunk_len(c);
  395. if(!clen)
  396. break;
  397. if(offset >= clen) {
  398. offset -= clen;
  399. c = c->next;
  400. continue;
  401. }
  402. chunk_peek_at(c, offset, pbuf, plen);
  403. return TRUE;
  404. }
  405. *pbuf = NULL;
  406. *plen = 0;
  407. return FALSE;
  408. }
  409. void Curl_bufq_skip(struct bufq *q, size_t amount)
  410. {
  411. size_t n;
  412. while(amount && q->head) {
  413. n = chunk_skip(q->head, amount);
  414. amount -= n;
  415. prune_head(q);
  416. }
  417. }
  418. CURLcode Curl_bufq_pass(struct bufq *q, Curl_bufq_writer *writer,
  419. void *writer_ctx, size_t *pwritten)
  420. {
  421. const unsigned char *buf;
  422. size_t blen;
  423. CURLcode result = CURLE_OK;
  424. *pwritten = 0;
  425. while(Curl_bufq_peek(q, &buf, &blen)) {
  426. size_t chunk_written;
  427. result = writer(writer_ctx, buf, blen, &chunk_written);
  428. if(result) {
  429. if((result == CURLE_AGAIN) && *pwritten) {
  430. /* blocked on subsequent write, report success */
  431. result = CURLE_OK;
  432. }
  433. break;
  434. }
  435. if(!chunk_written) {
  436. if(!*pwritten) {
  437. /* treat as blocked */
  438. result = CURLE_AGAIN;
  439. }
  440. break;
  441. }
  442. *pwritten += chunk_written;
  443. Curl_bufq_skip(q, chunk_written);
  444. }
  445. return result;
  446. }
  447. CURLcode Curl_bufq_write_pass(struct bufq *q,
  448. const unsigned char *buf, size_t len,
  449. Curl_bufq_writer *writer, void *writer_ctx,
  450. size_t *pwritten)
  451. {
  452. CURLcode result = CURLE_OK;
  453. size_t n;
  454. *pwritten = 0;
  455. while(len) {
  456. if(Curl_bufq_is_full(q)) {
  457. /* try to make room in case we are full */
  458. result = Curl_bufq_pass(q, writer, writer_ctx, &n);
  459. if(result) {
  460. if(result != CURLE_AGAIN) {
  461. /* real error, fail */
  462. return result;
  463. }
  464. /* would block, bufq is full, give up */
  465. break;
  466. }
  467. }
  468. /* Add to bufq as much as there is room for */
  469. result = Curl_bufq_write(q, buf, len, &n);
  470. if(result) {
  471. if(result != CURLE_AGAIN)
  472. /* real error, fail */
  473. return result;
  474. if((result == CURLE_AGAIN) && *pwritten)
  475. /* we did write successfully before */
  476. result = CURLE_OK;
  477. return result;
  478. }
  479. else if(n == 0)
  480. /* edge case of writer returning 0 (and len is >0)
  481. * break or we might enter an infinite loop here */
  482. break;
  483. /* Track what we added to bufq */
  484. buf += n;
  485. len -= n;
  486. *pwritten += n;
  487. }
  488. return (!*pwritten && len) ? CURLE_AGAIN : CURLE_OK;
  489. }
  490. CURLcode Curl_bufq_sipn(struct bufq *q, size_t max_len,
  491. Curl_bufq_reader *reader, void *reader_ctx,
  492. size_t *pnread)
  493. {
  494. struct buf_chunk *tail = NULL;
  495. *pnread = 0;
  496. tail = get_non_full_tail(q);
  497. if(!tail) {
  498. if(q->chunk_count < q->max_chunks)
  499. return CURLE_OUT_OF_MEMORY;
  500. /* full, blocked */
  501. return CURLE_AGAIN;
  502. }
  503. return chunk_slurpn(tail, max_len, reader, reader_ctx, pnread);
  504. }
  505. /**
  506. * Read up to `max_len` bytes and append it to the end of the buffer queue.
  507. * if `max_len` is 0, no limit is imposed and the call behaves exactly
  508. * the same as `Curl_bufq_slurp()`.
  509. * Returns the total amount of buf read (may be 0) in `pnread` or error
  510. * Note that even in case of an error chunks may have been read and
  511. * the buffer queue will have different length than before.
  512. */
  513. static CURLcode bufq_slurpn(struct bufq *q, size_t max_len,
  514. Curl_bufq_reader *reader, void *reader_ctx,
  515. size_t *pnread)
  516. {
  517. CURLcode result;
  518. *pnread = 0;
  519. while(1) {
  520. size_t n;
  521. result = Curl_bufq_sipn(q, max_len, reader, reader_ctx, &n);
  522. if(result) {
  523. if(!*pnread || result != CURLE_AGAIN) {
  524. /* blocked on first read or real error, fail */
  525. return result;
  526. }
  527. result = CURLE_OK;
  528. break;
  529. }
  530. else if(n == 0) {
  531. /* eof */
  532. result = CURLE_OK;
  533. break;
  534. }
  535. *pnread += n;
  536. if(max_len) {
  537. DEBUGASSERT(n <= max_len);
  538. max_len -= n;
  539. if(!max_len)
  540. break;
  541. }
  542. /* give up slurping when we get less bytes than we asked for */
  543. if(q->tail && !chunk_is_full(q->tail))
  544. break;
  545. }
  546. return result;
  547. }
  548. CURLcode Curl_bufq_slurp(struct bufq *q, Curl_bufq_reader *reader,
  549. void *reader_ctx, size_t *pnread)
  550. {
  551. return bufq_slurpn(q, 0, reader, reader_ctx, pnread);
  552. }