فهرست منبع

http2: refactor HTTP/2 polling logic to fix deadlock and crash issue.

Nick Peng 2 هفته پیش
والد
کامیت
ff1b837b30

+ 196 - 183
src/dns_client/client_http2.c

@@ -51,17 +51,17 @@ static int _dns_client_send_http2_stream(struct dns_server_info *server_info, st
 	struct client_dns_server_flag_https *https_flag = &server_info->flags.https;
 	char content_length[32];
 
+	if (http2_ctx == NULL) {
+		return -1;
+	}
+
 	/* Create HTTP/2 stream */
 	http2_stream = http2_stream_new(http2_ctx);
 	if (http2_stream == NULL) {
+		tlog(TLOG_WARN, "create http2 stream failed");
 		return -1;
 	}
 
-	pthread_mutex_lock(&server_info->lock);
-	conn_stream->http2_stream = http2_stream;
-	pthread_mutex_unlock(&server_info->lock);
-	http2_stream_set_ex_data(http2_stream, conn_stream);
-
 	/* Set request headers */
 	snprintf(content_length, sizeof(content_length), "%d", len);
 	struct http2_header_pair headers[] = {{"content-type", "application/dns-message"},
@@ -70,36 +70,23 @@ static int _dns_client_send_http2_stream(struct dns_server_info *server_info, st
 										  {NULL, NULL}};
 
 	if (http2_stream_set_request(http2_stream, "POST", https_flag->path, headers) < 0) {
-		pthread_mutex_lock(&server_info->lock);
-		conn_stream->http2_stream = NULL;
-		pthread_mutex_unlock(&server_info->lock);
-		http2_stream_put(http2_stream);
-		return -1;
+		goto errout;
 	}
 
 	/* Write request body */
 	if (http2_stream_write_body(http2_stream, (const uint8_t *)data, len, 1) < 0) {
-		pthread_mutex_lock(&server_info->lock);
-		conn_stream->http2_stream = NULL;
-		pthread_mutex_unlock(&server_info->lock);
-		http2_stream_put(http2_stream);
-		return -1;
+		goto errout;
 	}
 
-	return 0;
-}
-
-/* Helper function to clean up a finished HTTP/2 stream */
-static void _dns_client_cleanup_http2_stream(struct dns_server_info *server_info, struct dns_conn_stream *conn_stream,
-											 struct http2_stream *http2_stream)
-{
 	pthread_mutex_lock(&server_info->lock);
-	conn_stream->http2_stream = NULL;
-	list_del_init(&conn_stream->server_list);
+	conn_stream->http2_stream = http2_stream;
 	pthread_mutex_unlock(&server_info->lock);
+	http2_stream_set_ex_data(http2_stream, conn_stream);
+	return 0;
 
-	http2_stream_put(http2_stream);
-	_dns_client_conn_stream_put(conn_stream);
+errout:
+	http2_stream_close(http2_stream);
+	return -1;
 }
 
 /* Helper function to release a conn_stream and its references on error */
@@ -207,14 +194,14 @@ static int _dns_client_http2_pending_data(struct dns_conn_stream *stream, struct
 
 	pthread_mutex_lock(&server_info->lock);
 	if (list_empty(&stream->server_list)) {
-		list_add_tail(&stream->server_list, &server_info->conn_stream_list);
 		_dns_client_conn_stream_get(stream);
+		list_add_tail(&stream->server_list, &server_info->conn_stream_list);
 	}
 	stream->server_info = server_info;
 
 	if (list_empty(&stream->query_list)) {
-		list_add_tail(&stream->query_list, &query->conn_stream_list);
 		_dns_client_conn_stream_get(stream);
+		list_add_tail(&stream->query_list, &query->conn_stream_list);
 	}
 	stream->query = query;
 	pthread_mutex_unlock(&server_info->lock);
@@ -253,6 +240,12 @@ int _dns_client_send_http2(struct dns_server_info *server_info, struct dns_query
 	struct http2_ctx *http2_ctx = NULL;
 	int ret = -1;
 
+	if (len > DNS_IN_PACKSIZE - 128) {
+		tlog(TLOG_ERROR, "packet size is invalid.");
+		ret = -1;
+		goto out;
+	}
+
 	/* Create connection stream for this request */
 	stream = _dns_client_conn_stream_new();
 	if (stream == NULL) {
@@ -261,30 +254,15 @@ int _dns_client_send_http2(struct dns_server_info *server_info, struct dns_query
 	}
 	stream->type = DNS_SERVER_HTTPS;
 
-	/* Set stream pointers but don't add to lists yet */
-	stream->server_info = server_info;
-	stream->query = query;
-
-	if (len > DNS_IN_PACKSIZE - 128) {
-		tlog(TLOG_ERROR, "packet size is invalid.");
-		goto errout;
-	}
-
 	/* If not connected, buffer the data and return */
 	if (server_info->status != DNS_SERVER_STATUS_CONNECTED) {
 		ret = _dns_client_http2_pending_data(stream, server_info, query, packet, len);
-		if (ret != 0) {
-			goto errout;
-		}
 		goto out;
 	}
 
 	/* If connected but context not ready, buffer it too (will be flushed in process_http2) */
 	if (server_info->http2_ctx == NULL) {
 		ret = _dns_client_http2_pending_data(stream, server_info, query, packet, len);
-		if (ret != 0) {
-			goto errout;
-		}
 		goto out;
 	}
 
@@ -294,20 +272,18 @@ int _dns_client_send_http2(struct dns_server_info *server_info, struct dns_query
 		tlog(TLOG_ERROR, "send http2 stream failed.");
 		/* Fall back to buffering the data */
 		ret = _dns_client_http2_pending_data(stream, server_info, query, packet, len);
-		if (ret != 0) {
-			/* avoid memory leak */
-			goto errout;
-		}
 		goto out;
 	}
 
 	/* Now add stream to lists since HTTP/2 stream was successfully created */
 	pthread_mutex_lock(&server_info->lock);
-	list_add_tail(&stream->server_list, &server_info->conn_stream_list);
 	_dns_client_conn_stream_get(stream);
+	list_add_tail(&stream->server_list, &server_info->conn_stream_list);
+	stream->server_info = server_info;
 
-	list_add_tail(&stream->query_list, &query->conn_stream_list);
 	_dns_client_conn_stream_get(stream);
+	list_add_tail(&stream->query_list, &query->conn_stream_list);
+	stream->query = query;
 	pthread_mutex_unlock(&server_info->lock);
 
 	/* Flush data immediately */
@@ -332,177 +308,214 @@ int _dns_client_send_http2(struct dns_server_info *server_info, struct dns_query
 		}
 	}
 
+	ret = 0;
 out:
-	if (stream) {
-		/* Release initial reference - stream is now managed by the lists */
-		_dns_client_conn_stream_put(stream);
-	}
-	return 0;
-
-errout:
 	if (stream) {
 		_dns_client_conn_stream_put(stream);
 	}
 
-	return -1;
+	return ret;
 }
 
-int _dns_client_process_http2(struct dns_server_info *server_info, struct epoll_event *event, unsigned long now)
+static int _dns_client_http2_init_ctx(struct dns_server_info *server_info)
 {
 	struct http2_ctx *http2_ctx = server_info->http2_ctx;
+	struct client_dns_server_flag_https *https_flag = &server_info->flags.https;
 	int ret = 0;
 
-	/* Initialize context if needed (e.g. first time in EPOLLOUT) */
-	if (http2_ctx == NULL) {
-		struct client_dns_server_flag_https *https_flag = &server_info->flags.https;
-		pthread_mutex_lock(&server_info->lock);
-		if (server_info->http2_ctx == NULL) {
-			http2_ctx =
-				http2_ctx_client_new(https_flag->httphost, _http2_bio_read, _http2_bio_write, server_info, NULL);
-			if (http2_ctx == NULL) {
-				pthread_mutex_unlock(&server_info->lock);
-				tlog(TLOG_ERROR, "init http2 context failed.");
-				goto errout;
-			}
-			server_info->http2_ctx = http2_ctx;
-			/* server_info now owns the context (refcount=1 from _new) */
-			pthread_mutex_unlock(&server_info->lock);
+	if (http2_ctx != NULL) {
+		return 0;
+	}
 
-			/* Perform HTTP/2 handshake */
-			ret = http2_ctx_handshake(http2_ctx);
-			if (ret < 0) {
-				tlog(TLOG_ERROR, "http2 handshake failed.");
-				goto errout;
-			}
-		} else {
-			http2_ctx = server_info->http2_ctx;
+	pthread_mutex_lock(&server_info->lock);
+	if (server_info->http2_ctx == NULL) {
+		http2_ctx = http2_ctx_client_new(https_flag->httphost, _http2_bio_read, _http2_bio_write, server_info, NULL);
+		if (http2_ctx == NULL) {
 			pthread_mutex_unlock(&server_info->lock);
+			tlog(TLOG_ERROR, "init http2 context failed.");
+			return -1;
 		}
+		server_info->http2_ctx = http2_ctx;
+		/* server_info now owns the context (refcount=1 from _new) */
+		pthread_mutex_unlock(&server_info->lock);
+
+		/* Perform HTTP/2 handshake */
+		ret = http2_ctx_handshake(http2_ctx);
+		if (ret < 0) {
+			tlog(TLOG_ERROR, "http2 handshake failed.");
+			return -1;
+		}
+	} else {
+		pthread_mutex_unlock(&server_info->lock);
 	}
 
-	/* Handle EPOLLOUT - flush pending writes and send buffered requests */
-	if (event->events & EPOLLOUT) {
-		/* Send buffered requests */
-		_dns_client_send_buffered_http2_requests(server_info);
+	return 0;
+}
 
-		/* Flush pending writes */
-		_dns_client_flush_http2_writes(http2_ctx);
+static int _dns_client_http2_process_write(struct dns_server_info *server_info)
+{
+	struct http2_ctx *http2_ctx = server_info->http2_ctx;
+	int epoll_events = EPOLLIN;
 
-		/* Update epoll events based on write status */
-		int epoll_events = EPOLLIN;
-		if (http2_ctx_want_write(http2_ctx)) {
-			epoll_events |= EPOLLOUT;
-		}
+	/* Send buffered requests */
+	_dns_client_send_buffered_http2_requests(server_info);
 
-		if (server_info->fd > 0) {
-			struct epoll_event mod_event;
-			memset(&mod_event, 0, sizeof(mod_event));
-			mod_event.events = epoll_events;
-			mod_event.data.ptr = server_info;
-			if (epoll_ctl(client.epoll_fd, EPOLL_CTL_MOD, server_info->fd, &mod_event) != 0) {
-				tlog(TLOG_ERROR, "epoll ctl failed, %s", strerror(errno));
-				goto errout;
-			}
+	/* Flush pending writes */
+	_dns_client_flush_http2_writes(http2_ctx);
+
+	/* Update epoll events based on write status */
+	if (http2_ctx_want_write(http2_ctx)) {
+		epoll_events |= EPOLLOUT;
+	}
+
+	if (server_info->fd > 0) {
+		struct epoll_event mod_event;
+		memset(&mod_event, 0, sizeof(mod_event));
+		mod_event.events = epoll_events;
+		mod_event.data.ptr = server_info;
+		if (epoll_ctl(client.epoll_fd, EPOLL_CTL_MOD, server_info->fd, &mod_event) != 0) {
+			tlog(TLOG_ERROR, "epoll ctl failed, %s", strerror(errno));
+			return -1;
 		}
 	}
+	return 0;
+}
 
-	/* Handle EPOLLIN - read and process data */
-	if (event->events & EPOLLIN) {
-		struct http2_poll_item poll_items[10];
-		int poll_count = 0;
-		uint8_t response_body[DNS_IN_PACKSIZE];
-		int response_len = 0;
-		int loop_count = 0;
-		const int MAX_LOOP_COUNT = 128;
+static int _dns_client_http2_process_stream_one(struct dns_server_info *server_info,
+												struct dns_conn_stream *conn_stream)
+{
+	struct http2_stream *http2_stream = conn_stream->http2_stream;
+	uint8_t response_body[DNS_IN_PACKSIZE];
+	int response_len = 0;
+	int ret = 0;
 
-		/* Ensure handshake is complete before polling */
-		ret = http2_ctx_handshake(http2_ctx);
-		if (ret == 0) {
-			/* Handshake in progress, need more data */
-			return 0;
-		} else if (ret < 0) {
-			tlog(TLOG_ERROR, "http2 handshake failed.");
-			goto errout;
-		}
-		/* ret == 1 means handshake complete, continue */
-
-		/* Poll and process streams until no more ready */
-		while (loop_count++ < MAX_LOOP_COUNT) {
-			/* Poll for stream readiness */
-			ret = http2_ctx_poll(http2_ctx, poll_items, 10, &poll_count);
-			if (ret < 0) {
-				if (ret != HTTP2_ERR_EOF) {
-					tlog(TLOG_DEBUG, "http2 poll failed, ret=%d", ret);
-				}
-				goto errout;
-			}
+	if (http2_stream == NULL || conn_stream->query == NULL) {
+		return 1;
+	}
 
-			if (poll_count == 0) {
-				/* No more ready streams */
-				break;
-			}
+	/* Check HTTP status code first */
+	int status = http2_stream_get_status(http2_stream);
+	if (status > 0 && status != 200) {
+		tlog(TLOG_WARN, "http2 server query from %s:%d failed, server return http code: %d", server_info->ip,
+			 server_info->port, status);
+		server_info->prohibit = 1;
+		return 1;
+	}
 
-			/* Process each ready stream */
-			for (int i = 0; i < poll_count; i++) {
-				struct http2_stream *http2_stream = poll_items[i].stream;
-				struct dns_conn_stream *conn_stream = NULL;
+	/* Read response body */
+	response_len = http2_stream_read_body(http2_stream, response_body, sizeof(response_body));
+	if (response_len <= 0) {
+		/* Error or no data - check if stream has ended */
+		goto out;
+	}
 
-				if (http2_stream == NULL || !poll_items[i].readable) {
-					continue;
-				}
+	/* Process DNS response */
+	ret = _dns_client_recv(server_info, response_body, response_len, &server_info->addr, server_info->ai_addrlen);
+	if (ret != 0) {
+		tlog(TLOG_ERROR, "process dns response failed");
+	}
 
-				/* Get conn_stream from stream's private data */
-				conn_stream = (struct dns_conn_stream *)http2_stream_get_ex_data(http2_stream);
-				if (conn_stream == NULL) {
-					tlog(TLOG_DEBUG, "conn_stream is null for http2 stream");
-					http2_stream_put(http2_stream);
-					continue;
-				}
+out:
+	if (http2_stream_is_end(http2_stream)) {
+		return 1;
+	}
 
-				/* Check HTTP status code first */
-				int status = http2_stream_get_status(http2_stream);
-				if (status > 0 && status != 200) {
-					tlog(TLOG_WARN, "http2 server query from %s:%d failed, server return http code: %d",
-						 server_info->ip, server_info->port, status);
-					server_info->prohibit = 1;
-					_dns_client_cleanup_http2_stream(server_info, conn_stream, http2_stream);
-					continue;
-				}
+	return 0;
+}
 
-				/* Read response body */
-				response_len = http2_stream_read_body(http2_stream, response_body, sizeof(response_body));
-				if (response_len < 0) {
-					/* Error or no data - check if stream has ended */
-					if (http2_stream_is_end(http2_stream)) {
-						_dns_client_cleanup_http2_stream(server_info, conn_stream, http2_stream);
+static int _dns_client_http2_process_read(struct dns_server_info *server_info)
+{
+	struct http2_ctx *http2_ctx = server_info->http2_ctx;
+	struct http2_poll_item poll_items[128];
+	int poll_count = 0;
+	int loop_count = 0;
+	const int MAX_LOOP_COUNT = 128;
+	struct dns_conn_stream *conn_stream = NULL;
+	int ret = 0;
+	int i = 0;
+
+	/* Ensure handshake is complete before polling */
+	ret = http2_ctx_handshake(http2_ctx);
+	if (ret == 0) {
+		/* Handshake in progress, need more data */
+		return 0;
+	} else if (ret < 0) {
+		tlog(TLOG_ERROR, "http2 handshake failed.");
+		return -1;
+	}
+
+	/* Poll and process streams until no more ready */
+	while (loop_count++ < MAX_LOOP_COUNT) {
+		/* Poll for stream readiness */
+		ret = http2_ctx_poll_readable(http2_ctx, poll_items, 128, &poll_count);
+		if (ret < 0) {
+			if (ret != HTTP2_ERR_EOF) {
+				tlog(TLOG_DEBUG, "http2 poll failed, ret=%d", ret);
+			}
+			return -1;
+		}
+
+		if (poll_count == 0) {
+			break;
+		}
+
+		/* Process each ready stream */
+		for (i = 0; i < poll_count; i++) {
+			struct http2_stream *stream = poll_items[i].stream;
+			if (stream == NULL) {
+				continue;
+			}
+
+			conn_stream = (struct dns_conn_stream *)http2_stream_get_ex_data(stream);
+			if (conn_stream == NULL) {
+				continue;
+			}
+
+			if (poll_items[i].readable) {
+				int stream_ended = _dns_client_http2_process_stream_one(server_info, conn_stream);
+				if (stream_ended) {
+					int need_put = 0;
+					pthread_mutex_lock(&server_info->lock);
+					if (!list_empty(&conn_stream->server_list)) {
+						list_del_init(&conn_stream->server_list);
+						conn_stream->server_info = NULL;
+						need_put = 1;
 					}
-					continue;
-				}
+					pthread_mutex_unlock(&server_info->lock);
 
-				if (response_len == 0) {
-					/* EOF - check if stream has ended */
-					if (http2_stream_is_end(http2_stream)) {
-						_dns_client_cleanup_http2_stream(server_info, conn_stream, http2_stream);
+					if (need_put) {
+						_dns_client_conn_stream_put(conn_stream);
 					}
-					continue;
 				}
+			}
+		}
 
-				/* Process DNS response */
-				ret = _dns_client_recv(server_info, response_body, response_len, &server_info->addr,
-									   server_info->ai_addrlen);
-				if (ret != 0) {
-					tlog(TLOG_ERROR, "process dns response failed");
-				}
+		if (poll_count < 128) {
+			break;
+		}
+	}
+	return 0;
+}
 
-				/* Check if stream has ended after reading body */
-				if (http2_stream_is_end(http2_stream)) {
-					_dns_client_cleanup_http2_stream(server_info, conn_stream, http2_stream);
-				}
-			}
+int _dns_client_process_http2(struct dns_server_info *server_info, struct epoll_event *event, unsigned long now)
+{
+	if (server_info->http2_ctx == NULL) {
+		if (_dns_client_http2_init_ctx(server_info) < 0) {
+			return -1;
 		}
 	}
 
+	if (event->events & EPOLLOUT) {
+		if (_dns_client_http2_process_write(server_info) < 0) {
+			return -1;
+		}
+	}
+
+	/* Always process read, as write might have read data (e.g. WINDOW_UPDATE),
+	   or there might be pending data in SSL/HTTP2 buffers */
+	if (_dns_client_http2_process_read(server_info) < 0) {
+		return -1;
+	}
+
 	return 0;
-errout:
-	return -1;
 }

+ 1 - 2
src/dns_client/client_socket.c

@@ -127,14 +127,13 @@ void _dns_client_close_socket_ext(struct dns_server_info *server_info, int no_de
 				_dns_client_conn_stream_put(conn_stream);
 			}
 		} else if (server_info->type == DNS_SERVER_HTTPS) {
-			/* Clean up HTTP/2 streams for this server */
 			struct dns_conn_stream *conn_stream = NULL;
 			struct dns_conn_stream *tmp = NULL;
 
 			list_for_each_entry_safe(conn_stream, tmp, &server_info->conn_stream_list, server_list)
 			{
 				if (conn_stream->http2_stream) {
-					http2_stream_put(conn_stream->http2_stream);
+					http2_stream_close(conn_stream->http2_stream);
 					conn_stream->http2_stream = NULL;
 				}
 

+ 25 - 41
src/dns_client/conn_stream.c

@@ -52,44 +52,31 @@ void _dns_client_conn_stream_put(struct dns_conn_stream *stream)
 	int refcnt = atomic_dec_return(&stream->refcnt);
 	if (refcnt) {
 		if (refcnt < 0) {
-			BUG("BUG: stream refcnt is %d", refcnt);
+			BUG("BUG: stream  %p, refcnt is %d", stream, refcnt);
 		}
 		return;
 	}
 
-	if (stream->type == DNS_SERVER_QUIC || stream->type == DNS_SERVER_HTTP3) {
-		/* Clean up QUIC stream */
-		if (stream->quic_stream) {
-			SSL_free(stream->quic_stream);
-			stream->quic_stream = NULL;
-		}
-	} else if (stream->type == DNS_SERVER_HTTPS) {
-		/* Clean up HTTP/2 stream */
-		if (stream->http2_stream) {
-			struct http2_stream *http2_stream = stream->http2_stream;
-			stream->http2_stream = NULL;
-			http2_stream_put(http2_stream);
-		}
+	if (stream->quic_stream) {
+		SSL_free(stream->quic_stream);
+		stream->quic_stream = NULL;
 	}
 
-	if (stream->server_info) {
-		struct dns_server_info *server_info = stream->server_info;
-		stream->server_info = NULL;
-		pthread_mutex_lock(&server_info->lock);
-		/* Remove from server list and release reference */
-		if (!list_empty(&stream->server_list)) {
-			list_del_init(&stream->server_list);
-			stream->server_info = NULL;
-			_dns_client_conn_stream_put(stream);
-		}
-		pthread_mutex_unlock(&server_info->lock);
+	if (stream->http2_stream) {
+		struct http2_stream *http2_stream = stream->http2_stream;
+		stream->http2_stream = NULL;
+		http2_stream_close(http2_stream);
 	}
 
-	/* Remove from query list and release reference */
-	if (!list_empty(&stream->query_list)) {
+	if (stream->query) {
 		list_del_init(&stream->query_list);
 		stream->query = NULL;
-		_dns_client_conn_stream_put(stream);
+	}
+
+	if (stream->server_info) {
+		pthread_mutex_lock(&stream->server_info->lock);
+		list_del_init(&stream->server_list);
+		pthread_mutex_unlock(&stream->server_info->lock);
 	}
 
 	free(stream);
@@ -110,22 +97,19 @@ void _dns_client_conn_server_streams_free(struct dns_server_info *server_info, s
 
 		list_del_init(&stream->server_list);
 		stream->server_info = NULL;
-		if (stream->type == DNS_SERVER_QUIC || stream->type == DNS_SERVER_HTTP3) {
-			if (stream->quic_stream) {
+		if (stream->quic_stream) {
 #if defined(OSSL_QUIC1_VERSION) && !defined(OPENSSL_NO_QUIC)
-				SSL_stream_reset(stream->quic_stream, NULL, 0);
+			SSL_stream_reset(stream->quic_stream, NULL, 0);
 #endif
-				SSL_free(stream->quic_stream);
-				stream->quic_stream = NULL;
-			}
-		} else if (stream->type == DNS_SERVER_HTTPS) {
-			/* Clean up HTTP/2 stream */
-			if (stream->http2_stream) {
-				struct http2_stream *http2_stream = stream->http2_stream;
-				stream->http2_stream = NULL;
-				http2_stream_put(http2_stream);
-			}
+			SSL_free(stream->quic_stream);
+			stream->quic_stream = NULL;
 		}
+
+		if (stream->http2_stream) {
+			http2_stream_put(stream->http2_stream);
+			stream->http2_stream = NULL;
+		}
+
 		_dns_client_conn_stream_put(stream);
 	}
 	pthread_mutex_unlock(&server_info->lock);

+ 2 - 4
src/dns_client/dns_client.h

@@ -230,10 +230,8 @@ struct dns_conn_stream {
 	struct dns_query_struct *query;
 	struct dns_server_info *server_info;
 
-	union {
-		SSL *quic_stream;
-		struct http2_stream *http2_stream;
-	};
+	SSL *quic_stream;
+	struct http2_stream *http2_stream;
 	dns_server_type_t type;
 };
 

+ 7 - 3
src/dns_server/connection.c

@@ -18,6 +18,7 @@
 
 #include "connection.h"
 #include "dns_server.h"
+#include "server_http2.h"
 
 #include "smartdns/http2.h"
 
@@ -69,7 +70,11 @@ void _dns_server_conn_release(struct dns_server_conn_head *conn)
 			tls_server->ssl_ctx = NULL;
 		}
 	} else if (conn->type == DNS_CONN_TYPE_HTTP2_STREAM) {
-		/* Nothing to release for stream connection wrapper, just free(conn) at the end */
+		struct dns_server_conn_http2_stream *http2_stream = (struct dns_server_conn_http2_stream *)conn;
+		if (http2_stream->stream != NULL) {
+			http2_stream_close(http2_stream->stream);
+			http2_stream->stream = NULL;
+		}
 	}
 
 	if (conn->fd > 0) {
@@ -105,7 +110,7 @@ void _dns_server_close_socket(void)
 		/* Force cleanup of TLS/HTTPS client connections to prevent memory leaks */
 		if (conn->type == DNS_CONN_TYPE_TLS_CLIENT || conn->type == DNS_CONN_TYPE_HTTPS_CLIENT) {
 			struct dns_server_conn_tls_client *tls_client = (struct dns_server_conn_tls_client *)conn;
-			
+
 			/* Free SSL connection */
 			if (tls_client->ssl != NULL) {
 				SSL_free(tls_client->ssl);
@@ -165,7 +170,6 @@ int _dns_server_client_close(struct dns_server_conn_head *conn)
 		struct dns_server_conn_tls_client *tls_client = (struct dns_server_conn_tls_client *)conn;
 		if (tls_client->http2_ctx != NULL) {
 			http2_ctx_close(tls_client->http2_ctx);
-			http2_ctx_put(tls_client->http2_ctx);
 			tls_client->http2_ctx = NULL;
 		}
 	}

+ 29 - 12
src/dns_server/server_http2.c

@@ -93,6 +93,9 @@ static void _dns_server_http2_process_stream(struct dns_server_conn_tls_client *
 		len = http2_stream_read_body(stream, buf, sizeof(buf));
 		if (len < 0) {
 			/* Error or no data yet */
+			if (http2_stream_is_end(stream)) {
+				goto close_out;
+			}
 			return;
 		}
 
@@ -105,7 +108,7 @@ static void _dns_server_http2_process_stream(struct dns_server_conn_tls_client *
 		char *base64_query = NULL;
 
 		if (http2_stream_get_ex_data(stream)) {
-			return;
+			goto close_out;
 		}
 		http2_stream_set_ex_data(stream, (void *)1);
 
@@ -114,33 +117,34 @@ static void _dns_server_http2_process_stream(struct dns_server_conn_tls_client *
 
 		if (path == NULL) {
 			_dns_server_http2_send_response(stream, 404, "text/plain", "Not Found", 9);
-			return;
+			goto close_out;
 		}
 
 		/* Check path prefix */
 		if (strncmp(path, "/dns-query", 10) != 0) {
 			_dns_server_http2_send_response(stream, 404, "text/plain", "Not Found", 9);
-			return;
+			goto close_out;
 		}
 
 		/* Parse query string */
 		char *query_val = http2_stream_get_query_param(stream, "dns");
 		if (query_val == NULL) {
 			_dns_server_http2_send_response(stream, 400, "text/plain", "Bad Request", 11);
-			return;
+			goto close_out;
 		}
 
 		base64_query = malloc(DNS_IN_PACKSIZE);
 		if (base64_query == NULL) {
 			free(query_val);
-			return;
+			_dns_server_http2_send_response(stream, 500, "text/plain", "Bad Request", 11);
+			goto close_out;
 		}
 
 		if (urldecode(base64_query, DNS_IN_PACKSIZE, query_val) < 0) {
 			free(query_val);
 			free(base64_query);
 			_dns_server_http2_send_response(stream, 400, "text/plain", "Bad Request", 11);
-			return;
+			goto close_out;
 		}
 		free(query_val);
 
@@ -149,19 +153,19 @@ static void _dns_server_http2_process_stream(struct dns_server_conn_tls_client *
 
 		if (len <= 0) {
 			_dns_server_http2_send_response(stream, 400, "text/plain", "Bad Request", 11);
-			return;
+			goto close_out;
 		}
 	} else {
 		_dns_server_http2_send_response(stream, 405, "text/plain", "Method Not Allowed", 18);
-		return;
+		goto close_out;
 	}
 
 	if (len > 0) {
 		/* Create a fake connection object for this stream */
 		struct dns_server_conn_http2_stream *stream_conn = zalloc(1, sizeof(struct dns_server_conn_http2_stream));
 		if (stream_conn == NULL) {
-			tlog(TLOG_ERROR, "malloc failed for stream conn");
-			return;
+			_dns_server_http2_send_response(stream, 500, "text/plain", "Bad Request", 11);
+			goto close_out;
 		}
 
 		/* Initialize the fake connection */
@@ -185,6 +189,14 @@ static void _dns_server_http2_process_stream(struct dns_server_conn_tls_client *
 		/* Release our reference (request holds one now) */
 		_dns_server_conn_release(&stream_conn->head);
 	}
+
+	return;
+
+close_out:
+	if (stream != NULL) {
+		/* Close stream on error */
+		http2_stream_close(stream);
+	}
 }
 
 int _dns_server_process_http2(struct dns_server_conn_tls_client *tls_client, struct epoll_event *event,
@@ -198,11 +210,15 @@ int _dns_server_process_http2(struct dns_server_conn_tls_client *tls_client, str
 		struct http2_settings settings;
 		memset(&settings, 0, sizeof(settings));
 		settings.max_concurrent_streams = DNS_SERVER_HTTP2_MAX_CONCURRENT_STREAMS;
-		ctx = http2_ctx_server_new("smartdns-server", _http2_server_bio_read, _http2_server_bio_write, tls_client, &settings);
+		ctx = http2_ctx_server_new("smartdns-server", _http2_server_bio_read, _http2_server_bio_write, tls_client,
+								   &settings);
 		if (ctx == NULL) {
 			tlog(TLOG_ERROR, "init http2 context failed.");
 			return -1;
 		}
+		if (tls_client->http2_ctx != NULL) {
+			http2_ctx_close(ctx);
+		}
 		tls_client->http2_ctx = ctx;
 
 		/* Perform initial handshake */
@@ -256,7 +272,7 @@ int _dns_server_process_http2(struct dns_server_conn_tls_client *tls_client, str
 		/* Poll and process */
 		while (loop_count++ < MAX_LOOP_COUNT) {
 			poll_count = 0;
-			ret = http2_ctx_poll(ctx, poll_items, 10, &poll_count);
+			ret = http2_ctx_poll_readable(ctx, poll_items, 10, &poll_count);
 			if (ret < 0) {
 				if (ret == HTTP2_ERR_EAGAIN) {
 					break;
@@ -279,6 +295,7 @@ int _dns_server_process_http2(struct dns_server_conn_tls_client *tls_client, str
 					if (poll_items[i].readable) {
 						struct http2_stream *stream = http2_ctx_accept_stream(ctx);
 						if (stream) {
+							/* process immi */
 							_dns_server_http2_process_stream(tls_client, stream);
 						}
 					}

+ 254 - 72
src/http_parse/http2.c

@@ -113,6 +113,7 @@ struct http2_stream {
 	int end_stream_received;
 	int end_stream_sent;
 	int end_stream_read_handled; /* Flag to track if EOF has been reported to app */
+	int accepted;                /* Flag to track if stream has been accepted by app */
 	int window_size;
 	int body_decompressed; /* Flag to track if body has been decompressed */
 	void *ex_data;
@@ -533,6 +534,16 @@ static struct http2_stream *http2_create_stream(struct http2_ctx *ctx, int strea
 	stream->refcount = 1; /* Initial reference count */
 	stream->stream_id = stream_id;
 	stream->state = HTTP2_STREAM_IDLE;
+
+	/* Determine if stream is accepted (locally initiated) or needs accept (peer initiated) */
+	if (ctx->is_client) {
+		/* Client: Odd IDs are local (accepted), Even IDs are remote (need accept) */
+		stream->accepted = (stream_id % 2) != 0;
+	} else {
+		/* Server: Even IDs are local (accepted), Odd IDs are remote (need accept) */
+		stream->accepted = (stream_id % 2) == 0;
+	}
+
 	stream->window_size = HTTP2_DEFAULT_WINDOW_SIZE;
 	stream->body_buffer_size = 8192;
 	stream->body_buffer = malloc(stream->body_buffer_size);
@@ -548,23 +559,27 @@ static struct http2_stream *http2_create_stream(struct http2_ctx *ctx, int strea
 	stream->next = ctx->streams;
 	ctx->streams = stream;
 	ctx->active_streams++;
-
 	http2_ctx_get(ctx);
 
 	return stream;
 }
 
-static void http2_remove_stream(struct http2_ctx *ctx, struct http2_stream *stream)
+static int http2_remove_stream(struct http2_ctx *ctx, struct http2_stream *stream)
 {
+	int ret = -1;
+	pthread_mutex_lock(&ctx->mutex);
 	struct http2_stream **p = &ctx->streams;
 	while (*p) {
 		if (*p == stream) {
 			*p = stream->next;
 			ctx->active_streams--;
-			return;
+			ret = 0;
+			break;
 		}
 		p = &(*p)->next;
 	}
+	pthread_mutex_unlock(&ctx->mutex);
+	return ret;
 }
 
 static int http2_process_data_frame(struct http2_ctx *ctx, int stream_id, const uint8_t *data, int len, uint8_t flags)
@@ -925,10 +940,16 @@ void http2_ctx_put(struct http2_ctx *ctx)
 		return;
 	}
 
-	if (__sync_sub_and_fetch(&ctx->refcount, 1) > 0) {
+	int refcnt = __sync_sub_and_fetch(&ctx->refcount, 1);
+	if (refcnt > 0) {
 		return; /* Still has references */
 	}
 
+	if (refcnt < 0) {
+		BUG("http2_ctx_put: negative reference count");
+		return;
+	}
+
 	/* Reference count reached zero, free the context */
 	pthread_mutex_lock(&ctx->mutex);
 
@@ -936,11 +957,9 @@ void http2_ctx_put(struct http2_ctx *ctx)
 	while (ctx->streams) {
 		struct http2_stream *next = ctx->streams->next;
 		ctx->streams->next = NULL; /* Detach from list */
-
-		/* Manually free stream without calling unref to avoid recursion */
-		http2_free_headers(ctx->streams);
-		free(ctx->streams->body_buffer);
-		free(ctx->streams);
+		ctx->streams->ctx = NULL;  /* Break circular reference */
+		ctx->streams->state = HTTP2_STREAM_CLOSED;
+		http2_stream_put(ctx->streams);
 
 		ctx->streams = next;
 	}
@@ -975,7 +994,7 @@ void http2_ctx_close(struct http2_ctx *ctx)
 
 	pthread_mutex_unlock(&ctx->mutex);
 
-	/* Now free streams outside the lock */
+	/* Now free streams outside the lock - just break circular references */
 	while (streams_to_free) {
 		struct http2_stream *stream = streams_to_free;
 		streams_to_free = stream->next;
@@ -984,13 +1003,15 @@ void http2_ctx_close(struct http2_ctx *ctx)
 		stream->ctx = NULL;
 		stream->next = NULL;
 
-		/* Release the reference that the context was holding */
-		/* This will properly decrement refcount and only free if refcount reaches 0 */
 		http2_stream_put(stream);
 
+		/* Do not release stream reference - caller is responsible for calling http2_stream_put */
 		/* Release the reference to ctx that was taken when the stream was created */
 		http2_ctx_put(ctx);
 	}
+
+	/* release context reference held by caller */
+	http2_ctx_put(ctx);
 }
 
 struct http2_stream *http2_stream_get(struct http2_stream *stream)
@@ -1008,17 +1029,24 @@ void http2_stream_put(struct http2_stream *stream)
 		return;
 	}
 
-	if (__sync_sub_and_fetch(&stream->refcount, 1) > 0) {
+	int refcnt = __sync_sub_and_fetch(&stream->refcount, 1);
+	if (refcnt > 0) {
 		return; /* Still has references */
 	}
 
+	if (refcnt < 0) {
+		BUG("http2_stream: negative reference count");
+		return;
+	}
+
 	/* Reference count reached zero, free the stream */
 	struct http2_ctx *ctx = stream->ctx;
 
 	if (ctx) {
-		pthread_mutex_lock(&ctx->mutex);
-		http2_remove_stream(ctx, stream);
-		pthread_mutex_unlock(&ctx->mutex);
+		if (http2_remove_stream(ctx, stream) == 0) {
+			/* release ownership held by ctx */
+			http2_stream_put(stream);
+		}
 		http2_ctx_put(ctx);
 	}
 
@@ -1156,8 +1184,13 @@ struct http2_stream *http2_ctx_accept_stream(struct http2_ctx *ctx)
 	struct http2_stream *stream = ctx->streams;
 	while (stream) {
 		if ((ctx->is_client && (stream->stream_id % 2) == 0) || (!ctx->is_client && (stream->stream_id % 2) == 1)) {
-			if (!list_empty(&stream->header_list.list) && !stream->end_stream_sent) {
+			if (!stream->accepted && !list_empty(&stream->header_list.list) && !stream->end_stream_sent) {
+				stream->accepted = 1;
 				pthread_mutex_unlock(&ctx->mutex);
+				if (stream) {
+					/* take owership */
+					http2_stream_get(stream);
+				}
 				return stream;
 			}
 		}
@@ -1168,10 +1201,8 @@ struct http2_stream *http2_ctx_accept_stream(struct http2_ctx *ctx)
 	return NULL;
 }
 
-int http2_ctx_poll(struct http2_ctx *ctx, struct http2_poll_item *items, int max_items, int *ret_count)
+static int _http2_ctx_io_process(struct http2_ctx *ctx)
 {
-	pthread_mutex_lock(&ctx->mutex);
-
 	/* Try to flush any pending writes first */
 	if (ctx->pending_write_len > 0) {
 		uint8_t dummy = 0;
@@ -1179,43 +1210,59 @@ int http2_ctx_poll(struct http2_ctx *ctx, struct http2_poll_item *items, int max
 	}
 
 	/* Process frames */
-	int ret = http2_process_frames(ctx);
+	return http2_process_frames(ctx);
+}
 
-	/* Note: We continue even if http2_process_frames returns error (like EOF),
-	   because we might have received data that made streams readable.
-	   We will return the error at the end if no streams are ready. */
+static int _http2_ctx_check_new_streams(struct http2_ctx *ctx, struct http2_poll_item *items, int max_items, int *count)
+{
+	if (ctx->is_client || *count >= max_items) {
+		return 0;
+	}
 
-	int count = 0;
+	struct http2_stream *stream = ctx->streams;
+	int has_new_stream = 0;
 
-	/* For server, check if there are new peer-initiated streams to accept */
-	if (!ctx->is_client && count < max_items) {
-		struct http2_stream *stream = ctx->streams;
-		int has_new_stream = 0;
-
-		while (stream) {
-			/* Server accepts odd stream IDs (client-initiated) */
-			/* Stream is ready to accept when it has received complete request (END_STREAM) */
-			if ((stream->stream_id % 2) == 1 && !list_empty(&stream->header_list.list) && stream->end_stream_received &&
-				!stream->end_stream_sent) {
-				has_new_stream = 1;
-				break;
-			}
-			stream = stream->next;
+	while (stream) {
+		/* Server accepts odd stream IDs (client-initiated) */
+		/* Stream is ready to accept when it has received complete request (END_STREAM) */
+		if ((stream->stream_id % 2) == 1 && !stream->accepted && !list_empty(&stream->header_list.list) &&
+			stream->end_stream_received && !stream->end_stream_sent) {
+			has_new_stream = 1;
+			break;
 		}
+		stream = stream->next;
+	}
 
-		if (has_new_stream) {
-			/* Return server context item (stream = NULL) to indicate new connection */
-			items[count].stream = NULL;
-			items[count].readable = 1;
-			items[count].writable = 0;
-			count++;
-		}
+	if (has_new_stream) {
+		/* Return server context item (stream = NULL) to indicate new connection */
+		items[*count].stream = NULL;
+		items[*count].readable = 1;
+		items[*count].writable = 0;
+		(*count)++;
+		return 1;
 	}
+	return 0;
+}
 
-	/* Return existing streams */
+static void _http2_ctx_collect_ready_streams(struct http2_ctx *ctx, struct http2_poll_item *items, int max_items,
+											 int *count, int check_writable)
+{
 	struct http2_stream *stream = ctx->streams;
+	struct http2_stream *prev = NULL;
+	struct http2_stream *ready_head = NULL;
+	struct http2_stream *ready_tail = NULL;
+
+	while (stream && *count < max_items) {
+		struct http2_stream *next_stream = stream->next;
+		int remove_from_list = 0;
+
+		/* Only return streams that have been accepted */
+		if (!stream->accepted) {
+			prev = stream;
+			stream = next_stream;
+			continue;
+		}
 
-	while (stream && count < max_items) {
 		/* Stream is readable if:
 		 * 1. Has unread body data in buffer, OR
 		 * 2. Stream has ended (all data including headers received)
@@ -1226,16 +1273,76 @@ int http2_ctx_poll(struct http2_ctx *ctx, struct http2_poll_item *items, int max
 		int readable = has_body_data || stream_ended;
 		int writable = stream->state == HTTP2_STREAM_OPEN || stream->state == HTTP2_STREAM_HALF_CLOSED_REMOTE;
 
-		if (readable || writable) {
-			items[count].stream = stream;
-			items[count].readable = readable;
-			items[count].writable = writable;
-			count++;
+		if (readable || (check_writable && writable)) {
+			items[*count].stream = stream;
+			items[*count].readable = readable;
+			items[*count].writable = writable;
+			(*count)++;
+			remove_from_list = 1;
 		}
 
-		stream = stream->next;
+		if (remove_from_list) {
+			/* Remove from current position */
+			if (prev) {
+				prev->next = next_stream;
+			} else {
+				ctx->streams = next_stream;
+			}
+
+			/* Add to ready list */
+			stream->next = NULL;
+			if (ready_tail) {
+				ready_tail->next = stream;
+				ready_tail = stream;
+			} else {
+				ready_head = stream;
+				ready_tail = stream;
+			}
+
+			/* Move to next, prev stays same */
+			stream = next_stream;
+		} else {
+			prev = stream;
+			stream = next_stream;
+		}
 	}
 
+	/* Append ready list to the end of ctx->streams */
+	if (ready_head) {
+		if (ctx->streams == NULL) {
+			ctx->streams = ready_head;
+		} else {
+			/* Find tail */
+			struct http2_stream *tail = ctx->streams;
+			if (prev && prev->next == NULL) {
+				/* Optimization: prev might be the tail if we iterated to the end */
+				tail = prev;
+			} else {
+				while (tail->next) {
+					tail = tail->next;
+				}
+			}
+			tail->next = ready_head;
+		}
+	}
+}
+
+static int _http2_ctx_poll(struct http2_ctx *ctx, struct http2_poll_item *items, int max_items, int *ret_count,
+						   int check_writable)
+{
+	pthread_mutex_lock(&ctx->mutex);
+
+	int ret = _http2_ctx_io_process(ctx);
+
+	/* Note: We continue even if http2_process_frames returns error (like EOF),
+	   because we might have received data that made streams readable.
+	   We will return the error at the end if no streams are ready. */
+
+	int count = 0;
+
+	_http2_ctx_check_new_streams(ctx, items, max_items, &count);
+	_http2_ctx_collect_ready_streams(ctx, items, max_items, &count, check_writable);
+
 	*ret_count = count;
 	pthread_mutex_unlock(&ctx->mutex);
 
@@ -1257,6 +1364,16 @@ int http2_ctx_poll(struct http2_ctx *ctx, struct http2_poll_item *items, int max
 	return 0;
 }
 
+int http2_ctx_poll(struct http2_ctx *ctx, struct http2_poll_item *items, int max_items, int *ret_count)
+{
+	return _http2_ctx_poll(ctx, items, max_items, ret_count, 1);
+}
+
+int http2_ctx_poll_readable(struct http2_ctx *ctx, struct http2_poll_item *items, int max_items, int *ret_count)
+{
+	return _http2_ctx_poll(ctx, items, max_items, ret_count, 0);
+}
+
 struct http2_stream *http2_stream_new(struct http2_ctx *ctx)
 {
 	pthread_mutex_lock(&ctx->mutex);
@@ -1265,11 +1382,51 @@ struct http2_stream *http2_stream_new(struct http2_ctx *ctx)
 	ctx->next_stream_id += 2;
 
 	struct http2_stream *stream = http2_create_stream(ctx, stream_id);
+	if (stream) {
+		/* take owership */
+		http2_stream_get(stream);
+	}
 
 	pthread_mutex_unlock(&ctx->mutex);
 	return stream;
 }
 
+static int http2_send_rst_stream(struct http2_ctx *ctx, int stream_id, uint32_t error_code)
+{
+	uint8_t frame[HTTP2_FRAME_HEADER_SIZE + 4];
+
+	http2_write_frame_header(frame, 4, HTTP2_FRAME_RST_STREAM, 0, stream_id);
+	write_uint32(frame + HTTP2_FRAME_HEADER_SIZE, error_code);
+
+	return http2_send_frame(ctx, frame, sizeof(frame));
+}
+
+void http2_stream_close(struct http2_stream *stream)
+{
+	if (stream == NULL) {
+		return;
+	}
+
+	struct http2_ctx *ctx = stream->ctx;
+	if (ctx) {
+		pthread_mutex_lock(&ctx->mutex);
+
+		/* Send RST_STREAM to close the stream */
+		http2_send_rst_stream(ctx, stream->stream_id, 0); /* NO_ERROR */
+		if (http2_remove_stream(ctx, stream) == 0) {
+			/* release ownership held by ctx */
+			http2_stream_put(stream);
+		}
+		stream->ctx = NULL;
+		pthread_mutex_unlock(&ctx->mutex);
+		http2_ctx_put(ctx);
+	}
+	/* Mark stream as closed */
+	stream->state = HTTP2_STREAM_CLOSED;
+
+	http2_stream_put(stream);
+}
+
 int http2_stream_get_id(struct http2_stream *stream)
 {
 	if (!stream) {
@@ -1537,7 +1694,7 @@ static int http2_try_decompress_body(struct http2_stream *stream)
 	}
 
 	/* Only decompress when the stream is fully received or connection is closed */
-	if (!stream->end_stream_received && stream->ctx->status >= 0) {
+	if (!stream->end_stream_received && stream->ctx && stream->ctx->status >= 0) {
 		return 0;
 	}
 
@@ -1583,7 +1740,9 @@ int http2_stream_read_body(struct http2_stream *stream, uint8_t *data, int len)
 	}
 
 	struct http2_ctx *ctx = stream->ctx;
-	pthread_mutex_lock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_lock(&ctx->mutex);
+	}
 
 	/* NOTE: We do NOT call http2_process_frames here!
 	 * The caller should use http2_ctx_poll to process frames for all streams.
@@ -1599,8 +1758,9 @@ int http2_stream_read_body(struct http2_stream *stream, uint8_t *data, int len)
 		/* Check if it's a compression format we handle */
 		if (strcasecmp(content_encoding, "gzip") == 0 || strcasecmp(content_encoding, "deflate") == 0) {
 			/* If stream not ended and connection is healthy, return EAGAIN */
-			if (!stream->end_stream_received && ctx->status >= 0) {
-				pthread_mutex_unlock(&ctx->mutex);
+			if (!stream->end_stream_received && (!ctx || ctx->status >= 0)) {
+				if (ctx)
+					pthread_mutex_unlock(&ctx->mutex);
 				errno = EAGAIN;
 				return -1;
 			}
@@ -1609,10 +1769,12 @@ int http2_stream_read_body(struct http2_stream *stream, uint8_t *data, int len)
 
 	int available = stream->body_buffer_len - stream->body_read_offset;
 	if (available <= 0) {
-		pthread_mutex_unlock(&ctx->mutex);
+		if (ctx) {
+			pthread_mutex_unlock(&ctx->mutex);
+		}
 
 		/* If stream ended or connection has error, return 0 (EOF) */
-		if (stream->end_stream_received || ctx->status < 0) {
+		if (stream->end_stream_received || (!ctx || ctx->status < 0)) {
 			stream->end_stream_read_handled = 1;
 			return 0;
 		}
@@ -1626,7 +1788,9 @@ int http2_stream_read_body(struct http2_stream *stream, uint8_t *data, int len)
 	memcpy(data, stream->body_buffer + stream->body_read_offset, to_read);
 	stream->body_read_offset += to_read;
 
-	pthread_mutex_unlock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_unlock(&ctx->mutex);
+	}
 	return to_read;
 }
 
@@ -1637,7 +1801,9 @@ int http2_stream_body_available(struct http2_stream *stream)
 	}
 
 	struct http2_ctx *ctx = stream->ctx;
-	pthread_mutex_lock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_lock(&ctx->mutex);
+	}
 
 	/* Try to decompress if needed */
 	http2_try_decompress_body(stream);
@@ -1646,8 +1812,10 @@ int http2_stream_body_available(struct http2_stream *stream)
 	const char *content_encoding = http2_stream_get_header_value(stream, "content-encoding");
 	if (content_encoding && !stream->body_decompressed) {
 		if (strcasecmp(content_encoding, "gzip") == 0 || strcasecmp(content_encoding, "deflate") == 0) {
-			if (!stream->end_stream_received && ctx->status >= 0) {
-				pthread_mutex_unlock(&ctx->mutex);
+			if (!stream->end_stream_received && (!ctx || ctx->status >= 0)) {
+				if (ctx) {
+					pthread_mutex_unlock(&ctx->mutex);
+				}
 				return 0;
 			}
 		}
@@ -1655,7 +1823,9 @@ int http2_stream_body_available(struct http2_stream *stream)
 
 	int available = stream->body_buffer_len - stream->body_read_offset;
 
-	pthread_mutex_unlock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_unlock(&ctx->mutex);
+	}
 	return available > 0 ? 1 : 0;
 }
 
@@ -1666,7 +1836,9 @@ int http2_stream_is_end(struct http2_stream *stream)
 	}
 
 	struct http2_ctx *ctx = stream->ctx;
-	pthread_mutex_lock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_lock(&ctx->mutex);
+	}
 
 	/* Try to decompress if needed - this might change body_buffer_len */
 	http2_try_decompress_body(stream);
@@ -1674,11 +1846,13 @@ int http2_stream_is_end(struct http2_stream *stream)
 	int is_end = stream->end_stream_received && (stream->body_read_offset >= stream->body_buffer_len);
 
 	/* If connection is closed/error, and we have read all buffered data, consider stream ended */
-	if (!is_end && ctx->status < 0 && stream->body_read_offset >= stream->body_buffer_len) {
+	if (!is_end && (!ctx || ctx->status < 0) && stream->body_read_offset >= stream->body_buffer_len) {
 		is_end = 1;
 	}
 
-	pthread_mutex_unlock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_unlock(&ctx->mutex);
+	}
 	return is_end;
 }
 
@@ -1689,11 +1863,15 @@ void http2_stream_set_ex_data(struct http2_stream *stream, void *data)
 	}
 
 	struct http2_ctx *ctx = stream->ctx;
-	pthread_mutex_lock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_lock(&ctx->mutex);
+	}
 
 	stream->ex_data = data;
 
-	pthread_mutex_unlock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_unlock(&ctx->mutex);
+	}
 }
 
 void *http2_stream_get_ex_data(struct http2_stream *stream)
@@ -1703,11 +1881,15 @@ void *http2_stream_get_ex_data(struct http2_stream *stream)
 	}
 
 	struct http2_ctx *ctx = stream->ctx;
-	pthread_mutex_lock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_lock(&ctx->mutex);
+	}
 
 	void *data = stream->ex_data;
 
-	pthread_mutex_unlock(&ctx->mutex);
+	if (ctx) {
+		pthread_mutex_unlock(&ctx->mutex);
+	}
 	return data;
 }
 

+ 16 - 1
src/include/smartdns/http2.h

@@ -86,7 +86,7 @@ struct http2_ctx *http2_ctx_server_new(const char *server, http2_bio_read_fn bio
 									   void *private_data, const struct http2_settings *settings);
 
 /**
- * Close an HTTP/2 context and release all streams
+ * Close an HTTP/2 context, release all streams and release ownership.
  * This is used to break circular references between context and streams
  * @param ctx Context to close
  */
@@ -130,6 +130,16 @@ struct http2_stream *http2_ctx_accept_stream(struct http2_ctx *ctx);
  */
 int http2_ctx_poll(struct http2_ctx *ctx, struct http2_poll_item *items, int max_items, int *ret_count);
 
+/**
+ * Poll streams for readiness (only readable streams)
+ * @param ctx HTTP/2 context
+ * @param items Array to fill with poll results
+ * @param max_items Maximum number of items to return
+ * @param ret_count Output: number of items returned
+ * @return 0 on success, -1 on error
+ */
+int http2_ctx_poll_readable(struct http2_ctx *ctx, struct http2_poll_item *items, int max_items, int *ret_count);
+
 /**
  * Check if context wants to read (EAGAIN on last read)
  * @param ctx HTTP/2 context
@@ -165,6 +175,11 @@ struct http2_stream *http2_stream_new(struct http2_ctx *ctx);
  * @param stream Stream to free
  */
 
+/**
+ * Close a stream and release ownership
+ * @param stream Stream to close
+ */
+void http2_stream_close(struct http2_stream *stream);
 
 /**
  * Increase reference count of stream

+ 3 - 2
test/Makefile

@@ -22,6 +22,7 @@ SMARTDNS_TEST_LIB=$(SMARTDNS_SRC_DIR)/libsmartdns-test.a
 CXXFLAGS += -g
 CXXFLAGS += -DTEST
 CXXFLAGS += -I./ -I../src -I../src/include
+CXXFLAGS += -fno-omit-frame-pointer -Wstrict-aliasing -funwind-tables
 
 TEST_SOURCES := $(wildcard *.cc) $(wildcard */*.cc) $(wildcard */*/*.cc)
 TEST_OBJECTS := $(patsubst %.cc, %.o, $(TEST_SOURCES))
@@ -33,7 +34,7 @@ ifeq ($(filter -j,$(MAKEFLAGS)),)
 MAKEFLAGS += -j$(shell nproc)
 endif
 
-LDFLAGS += -lssl -lcrypto -lpthread -ldl -lgtest -lstdc++ -lm
+LDFLAGS += -lssl -lcrypto -lpthread -ldl -lgtest -lstdc++ -lm -rdynamic
 LDFLAGS += $(EXTRA_LDFLAGS)
 
 .PHONY: all clean test $(SMARTDNS_TEST_LIB)
@@ -47,7 +48,7 @@ test: $(BIN)
 	./$(BIN)
 
 $(SMARTDNS_TEST_LIB): 
-	$(MAKE) -C $(SMARTDNS_SRC_DIR) libsmartdns-test.a
+	$(MAKE) DEBUG=1 -C $(SMARTDNS_SRC_DIR) libsmartdns-test.a
 
 clean:
 	$(RM) $(OBJS) $(BIN)

+ 214 - 24
test/cases/test-http2-stress.cc

@@ -2,10 +2,10 @@
 #include "server.h"
 #include "smartdns/dns.h"
 #include "gtest/gtest.h"
-#include <thread>
-#include <vector>
 #include <atomic>
 #include <chrono>
+#include <thread>
+#include <vector>
 
 // Test rapid connection/disconnection to verify refcount handling
 TEST(HTTP2Stress, RapidConnectionCycle)
@@ -57,8 +57,7 @@ log-level debug
 		threads.emplace_back([&success_count, &failure_count]() {
 			smartdns::Client client;
 			if (client.Query("domain.com", 61053)) {
-				if (client.GetStatus() == "NOERROR" && 
-					client.GetAnswerNum() > 0 &&
+				if (client.GetStatus() == "NOERROR" && client.GetAnswerNum() > 0 &&
 					client.GetAnswer()[0].GetData() == "1.2.3.4") {
 					success_count++;
 				} else {
@@ -70,13 +69,12 @@ log-level debug
 		});
 	}
 
-	for (auto& t : threads) {
+	for (auto &t : threads) {
 		t.join();
 	}
 
-	std::cout << "Success: " << success_count.load() 
-			  << " Failure: " << failure_count.load() << std::endl;
-	
+	std::cout << "Success: " << success_count.load() << " Failure: " << failure_count.load() << std::endl;
+
 	// Most queries should succeed
 	EXPECT_GT(success_count.load(), 45);
 	EXPECT_LT(failure_count.load(), 5);
@@ -112,12 +110,12 @@ log-level debug
 		});
 	}
 
-	for (auto& t : threads) {
+	for (auto &t : threads) {
 		t.join();
 	}
 
 	std::cout << "Total successful queries: " << total_queries.load() << std::endl;
-	
+
 	// At least some queries should succeed
 	EXPECT_GT(total_queries.load(), 150);
 }
@@ -148,7 +146,7 @@ log-level debug
 	// Restart upstream server to force reconnection
 	server_wrap.Stop();
 	std::this_thread::sleep_for(std::chrono::milliseconds(500));
-	
+
 	server_wrap.Start(R"""(bind-https [::]:60053 -alpn h2
 address /test.com/5.6.7.8
 log-level debug
@@ -208,12 +206,12 @@ log-level debug
 	}
 
 	server_thread.join();
-	for (auto& t : query_threads) {
+	for (auto &t : query_threads) {
 		t.join();
 	}
 
 	std::cout << "Buffered queries succeeded: " << success_count.load() << std::endl;
-	
+
 	// Most buffered queries should eventually succeed
 	EXPECT_GT(success_count.load(), 5);
 }
@@ -253,13 +251,12 @@ log-level debug
 		});
 	}
 
-	for (auto& t : threads) {
+	for (auto &t : threads) {
 		t.join();
 	}
 
-	std::cout << "Total attempts: " << total_attempts.load() 
-			  << " Success: " << success_count.load() << std::endl;
-	
+	std::cout << "Total attempts: " << total_attempts.load() << " Success: " << success_count.load() << std::endl;
+
 	// Should have some successes despite one server being down
 	EXPECT_GT(success_count.load(), 0);
 	EXPECT_EQ(total_attempts.load(), 30);
@@ -281,7 +278,7 @@ log-level debug
 			// Ensure previous instance is fully stopped
 			std::this_thread::sleep_for(std::chrono::milliseconds(300));
 		}
-		
+
 		server_wrap.Start(R"""(bind-https [::]:60053 -alpn h2
 address /test.com/1.2.3.4
 log-level debug
@@ -319,6 +316,7 @@ TEST(HTTP2Stress, LongRunningConnection)
 {
 	smartdns::Server server_wrap;
 	smartdns::Server server;
+	int count = 20;
 
 	server.Start(R"""(bind [::]:61053
 server https://127.0.0.1:60053/dns-query -no-check-certificate -alpn h2
@@ -334,7 +332,7 @@ log-level debug
 	std::atomic<int> failure_count{0};
 
 	// Send many queries over time
-	for (int i = 0; i < 100; i++) {
+	for (int i = 0; i < count; i++) {
 		smartdns::Client client;
 		if (client.Query("test.com", 61053)) {
 			if (client.GetStatus() == "NOERROR") {
@@ -347,15 +345,207 @@ log-level debug
 		}
 
 		// Small delay between queries
-		if (i % 10 == 0) {
+		if (i % count == 0) {
 			std::this_thread::sleep_for(std::chrono::milliseconds(10));
 		}
 	}
 
-	std::cout << "Long-running test - Success: " << success_count.load() 
-			  << " Failure: " << failure_count.load() << std::endl;
-	
+	std::cout << "Long-running test - Success: " << success_count.load() << " Failure: " << failure_count.load()
+			  << std::endl;
+
 	// Most queries should succeed
-	EXPECT_GT(success_count.load(), 95);
+	EXPECT_GT(success_count.load(), count - 5);
 	EXPECT_LT(failure_count.load(), 5);
 }
+
+// Test high concurrency on the same HTTP/2 connection (multiple streams)
+TEST(HTTP2Stress, HighConcurrencySameConnection)
+{
+	smartdns::Server server_wrap;
+	smartdns::Server server;
+
+	server.Start(R"""(bind [::]:61053
+server https://127.0.0.1:60053/dns-query -no-check-certificate -alpn h2
+log-level debug
+cache-size 0
+)""");
+
+	server_wrap.Start(R"""(bind-https [::]:60053 -alpn h2
+address /test.com/1.2.3.4
+address /example.com/5.6.7.8
+address /domain.com/9.10.11.12
+log-level debug
+)""");
+
+	std::vector<std::thread> threads;
+	std::atomic<int> success_count{0};
+	std::atomic<int> failure_count{0};
+	const int num_threads = 10;
+	const int queries_per_thread = 10;
+
+	// Launch many threads making multiple queries each
+	// This should reuse the same HTTP/2 connection for multiple streams
+	for (int i = 0; i < num_threads; i++) {
+		threads.emplace_back([i, &success_count, &failure_count, queries_per_thread]() {
+			for (int j = 0; j < queries_per_thread; j++) {
+				smartdns::Client client;
+				std::string domain;
+
+				// Use different domains to test concurrent streams
+				switch (j % 3) {
+				case 0:
+					domain = "test.com";
+					break;
+				case 1:
+					domain = "example.com";
+					break;
+				case 2:
+					domain = "domain.com";
+					break;
+				}
+
+				if (client.Query(domain.c_str(), 61053)) {
+					if (client.GetStatus() == "NOERROR" && client.GetAnswerNum() > 0) {
+						success_count++;
+					} else {
+						failure_count++;
+						GTEST_ASSERT_TRUE(false) << "Query failed for " << domain << " Status: " << client.GetStatus();
+					}
+				} else {
+					failure_count++;
+				}
+
+				// Small random delay to create more interleaving
+				if ((i + j) % 7 == 0) {
+					std::this_thread::sleep_for(std::chrono::milliseconds(1));
+				}
+			}
+		});
+	}
+
+	for (auto &t : threads) {
+		t.join();
+	}
+
+	int total_queries = num_threads * queries_per_thread;
+	std::cout << "High concurrency test - Total queries: " << total_queries << " Success: " << success_count.load()
+			  << " Failure: " << failure_count.load() << std::endl;
+
+	// Most queries should succeed
+	EXPECT_GT(success_count.load(), total_queries * 0.9);
+	EXPECT_LT(failure_count.load(), total_queries * 0.1);
+}
+
+// Test rapid fire queries on same connection with minimal delays
+TEST(HTTP2Stress, RapidFireSameConnection)
+{
+	smartdns::Server server_wrap;
+	smartdns::Server server;
+
+	server.Start(R"""(bind [::]:61053
+server https://127.0.0.1:60053/dns-query -no-check-certificate -alpn h2
+log-level debug
+)""");
+
+	server_wrap.Start(R"""(bind-https [::]:60053 -alpn h2
+address /test.com/1.2.3.4
+log-level debug
+)""");
+
+	std::vector<std::thread> threads;
+	std::atomic<int> success_count{0};
+	std::atomic<int> failure_count{0};
+	const int num_threads = 20;
+	const int queries_per_thread = 25;
+
+	// Rapid fire queries from multiple threads on same connection
+	for (int i = 0; i < num_threads; i++) {
+		threads.emplace_back([&success_count, &failure_count, queries_per_thread]() {
+			for (int j = 0; j < queries_per_thread; j++) {
+				smartdns::Client client;
+				if (client.Query("test.com", 61053)) {
+					if (client.GetStatus() == "NOERROR") {
+						success_count++;
+					} else {
+						failure_count++;
+					}
+				} else {
+					failure_count++;
+				}
+				// No delay - maximum concurrency
+			}
+		});
+	}
+
+	for (auto &t : threads) {
+		t.join();
+	}
+
+	int total_queries = num_threads * queries_per_thread;
+	std::cout << "Rapid fire test - Total queries: " << total_queries << " Success: " << success_count.load()
+			  << " Failure: " << failure_count.load() << std::endl;
+
+	// Should handle high concurrency well
+	EXPECT_GT(success_count.load(), total_queries * 0.8);
+}
+
+// Test connection sharing under load
+TEST(HTTP2Stress, ConnectionSharingUnderLoad)
+{
+	smartdns::Server server_wrap;
+	smartdns::Server server;
+
+	server.Start(R"""(bind [::]:61053
+server https://127.0.0.1:60053/dns-query -no-check-certificate -alpn h2
+log-level debug
+)""");
+
+	server_wrap.Start(R"""(bind-https [::]:60053 -alpn h2
+address /test.com/1.2.3.4
+address /example.com/5.6.7.8
+log-level debug
+)""");
+
+	std::vector<std::thread> threads;
+	std::atomic<int> success_count{0};
+	std::atomic<int> failure_count{0};
+	const int num_threads = 50;
+	const int queries_per_thread = 10;
+
+	// Test connection reuse under sustained load
+	auto start_time = std::chrono::steady_clock::now();
+
+	for (int i = 0; i < num_threads; i++) {
+		threads.emplace_back([i, &success_count, &failure_count, queries_per_thread]() {
+			for (int j = 0; j < queries_per_thread; j++) {
+				smartdns::Client client;
+				std::string domain = (j % 2 == 0) ? "test.com" : "example.com";
+
+				if (client.Query(domain.c_str(), 61053)) {
+					if (client.GetStatus() == "NOERROR" && client.GetAnswerNum() > 0) {
+						success_count++;
+					} else {
+						failure_count++;
+					}
+				} else {
+					failure_count++;
+				}
+			}
+		});
+	}
+
+	for (auto &t : threads) {
+		t.join();
+	}
+
+	auto end_time = std::chrono::steady_clock::now();
+	auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time);
+
+	int total_queries = num_threads * queries_per_thread;
+	std::cout << "Connection sharing test - Total queries: " << total_queries << " Success: " << success_count.load()
+			  << " Failure: " << failure_count.load() << " Duration: " << duration.count() << "ms"
+			  << " QPS: " << (total_queries * 1000.0 / duration.count()) << std::endl;
+
+	// Should maintain good success rate under load
+	EXPECT_GT(success_count.load(), total_queries * 0.85);
+}

+ 366 - 33
test/cases/test-lib-http2.cc

@@ -1,4 +1,7 @@
 #include "gtest/gtest.h"
+#include <algorithm>
+#include <atomic>
+#include <chrono>
 #include <cstring>
 #include <fcntl.h>
 #include <iostream>
@@ -137,8 +140,8 @@ TEST_F(LIBHTTP2, Integrated)
 		http2_stream_set_response(stream, 200, headers, 2);
 		http2_stream_write_body(stream, (const uint8_t *)response, response_len, 1);
 
-		usleep(100000);
-		http2_ctx_put(ctx);
+		http2_stream_close(stream);
+		http2_ctx_close(ctx);
 	});
 
 	std::thread client_thread([this]() {
@@ -204,8 +207,8 @@ TEST_F(LIBHTTP2, Integrated)
 		std::string resp((char *)response_body, response_body_len);
 		EXPECT_NE(resp.find("Echo Response"), std::string::npos);
 
-		http2_stream_put(stream);
-		http2_ctx_put(ctx);
+		http2_stream_close(stream);
+		http2_ctx_close(ctx);
 	});
 
 	server_thread.join();
@@ -236,6 +239,7 @@ TEST_F(LIBHTTP2, MultiStream)
 
 		int streams_completed = 0;
 		int max_iterations = 500;
+		std::set<struct http2_stream *> processed_streams;
 		while (streams_completed < NUM_STREAMS && max_iterations-- > 0) {
 			struct pollfd pfd = {server_sock, POLLIN, 0};
 			poll(&pfd, 1, 100);
@@ -246,31 +250,37 @@ TEST_F(LIBHTTP2, MultiStream)
 
 			for (int i = 0; i < count; i++) {
 				if (items[i].stream == nullptr && items[i].readable) {
-					http2_ctx_accept_stream(ctx);
+					struct http2_stream *s = http2_ctx_accept_stream(ctx);
 				} else if (items[i].stream && items[i].readable) {
 					struct http2_stream *stream = items[i].stream;
 					uint8_t buf[1024];
 					http2_stream_read_body(stream, buf, sizeof(buf));
 
 					if (http2_stream_is_end(stream)) {
-						char response[256];
-						int response_len =
-							snprintf(response, sizeof(response), "Echo from stream %d", http2_stream_get_id(stream));
-						char content_length[32];
-						snprintf(content_length, sizeof(content_length), "%d", response_len);
-						struct http2_header_pair headers[] = {{"content-type", "text/plain"},
-															  {"content-length", content_length}};
-						http2_stream_set_response(stream, 200, headers, 2);
-						http2_stream_write_body(stream, (const uint8_t *)response, response_len, 1);
-						streams_completed++;
+						if (processed_streams.find(stream) == processed_streams.end()) {
+							char response[256];
+							int response_len = snprintf(response, sizeof(response), "Echo from stream %d",
+														http2_stream_get_id(stream));
+							char content_length[32];
+							snprintf(content_length, sizeof(content_length), "%d", response_len);
+							struct http2_header_pair headers[] = {{"content-type", "text/plain"},
+																  {"content-length", content_length}};
+							http2_stream_set_response(stream, 200, headers, 2);
+							http2_stream_write_body(stream, (const uint8_t *)response, response_len, 1);
+							streams_completed++;
+							processed_streams.insert(stream);
+						}
 					}
 				}
 			}
 			usleep(2000);
 		}
 
-		usleep(100000);
-		http2_ctx_put(ctx);
+		for (auto stream : processed_streams) {
+			http2_stream_close(stream);
+		}
+
+		http2_ctx_close(ctx);
 	});
 
 	std::thread client_thread([this, NUM_STREAMS]() {
@@ -341,9 +351,9 @@ TEST_F(LIBHTTP2, MultiStream)
 		EXPECT_EQ(streams_completed, NUM_STREAMS);
 
 		for (int i = 0; i < NUM_STREAMS; i++) {
-			http2_stream_put(streams[i]);
+			http2_stream_close(streams[i]);
 		}
-		http2_ctx_put(ctx);
+		http2_ctx_close(ctx);
 	});
 
 	server_thread.join();
@@ -423,9 +433,8 @@ TEST_F(LIBHTTP2, EarlyStreamCreation)
 			{"content-type", "text/plain"}, {"content-length", content_length}, {NULL, NULL}};
 		http2_stream_set_response(stream, 200, headers, 2);
 		http2_stream_write_body(stream, (const uint8_t *)response, response_len, 1);
-
-		usleep(100000);
-		http2_ctx_put(ctx);
+		http2_stream_close(stream);
+		http2_ctx_close(ctx);
 	});
 
 	std::thread client_thread([this]() {
@@ -495,8 +504,8 @@ TEST_F(LIBHTTP2, EarlyStreamCreation)
 		EXPECT_NE(resp.find("Echo Response"), std::string::npos);
 		EXPECT_NE(resp.find("test echo"), std::string::npos);
 
-		http2_stream_put(stream);
-		http2_ctx_put(ctx);
+		http2_stream_close(stream);
+		http2_ctx_close(ctx);
 	});
 
 	server_thread.join();
@@ -553,7 +562,7 @@ TEST_F(LIBHTTP2, ServerLoopTerminationOnDisconnect)
 			struct http2_poll_item items[10];
 			int count = 0;
 			http2_ctx_poll(ctx, items, 10, &count);
-			
+
 			int data_read = 0;
 			for (int i = 0; i < count; i++) {
 				if (items[i].stream == stream && items[i].readable) {
@@ -566,20 +575,20 @@ TEST_F(LIBHTTP2, ServerLoopTerminationOnDisconnect)
 					}
 				}
 			}
-			
+
 			if (!data_read && http2_stream_is_end(stream)) {
 				// If we are here, it means poll returned 0 items (or stream not readable),
 				// which is correct behavior after EOF is consumed.
 				// If the bug exists, poll would keep returning readable stream, and we would keep reading 0 bytes.
 				break;
 			}
-			
+
 			usleep(10000);
 		}
-		
+
 		EXPECT_LT(loop_count, 100) << "Server loop did not terminate (infinite loop detected)";
 
-		http2_ctx_put(ctx);
+		http2_ctx_close(ctx);
 	});
 
 	std::thread client_thread([this]() {
@@ -593,8 +602,9 @@ TEST_F(LIBHTTP2, ServerLoopTerminationOnDisconnect)
 			struct pollfd pfd = {client_sock, POLLIN, 0};
 			poll(&pfd, 1, 10);
 			ret = http2_ctx_handshake(ctx);
-			if (ret == 1) break;
-			if (ret < 0) break;
+			if (ret <= 1) {
+				break;
+			}
 		}
 		ASSERT_EQ(ret, 1);
 
@@ -604,9 +614,129 @@ TEST_F(LIBHTTP2, ServerLoopTerminationOnDisconnect)
 		struct http2_header_pair headers[] = {{"content-type", "text/plain"}, {NULL, NULL}};
 		http2_stream_set_request(stream, "POST", "/test", headers);
 		http2_stream_write_body(stream, (const uint8_t *)"test", 4, 1);
+		http2_stream_close(stream);
+		http2_ctx_close(ctx);
+	});
+
+	server_thread.join();
+	client_thread.join();
+}
+
+TEST_F(LIBHTTP2, StreamClose)
+{
+	std::thread server_thread([this]() {
+		struct http2_ctx *ctx = http2_ctx_server_new("test-server", bio_read, bio_write, &server_sock, NULL);
+		ASSERT_NE(ctx, nullptr);
+
+		// Handshake
+		int handshake_attempts = 200;
+		int ret = 0;
+		while (handshake_attempts-- > 0) {
+			struct pollfd pfd = {server_sock, POLLIN, 0};
+			int poll_ret = poll(&pfd, 1, 10);
+			if (poll_ret == 0) {
+				continue;
+			}
+			ret = http2_ctx_handshake(ctx);
+			if (ret == 1)
+				break;
+			if (ret < 0)
+				break;
+		}
+		ASSERT_EQ(ret, 1) << "Server handshake failed";
+
+		// Accept stream
+		struct http2_stream *stream = nullptr;
+		int max_attempts = 200;
+		while (max_attempts-- > 0 && !stream) {
+			struct pollfd pfd = {server_sock, POLLIN, 0};
+			poll(&pfd, 1, 100);
+			struct http2_poll_item items[10];
+			int count = 0;
+			http2_ctx_poll(ctx, items, 10, &count);
+			for (int i = 0; i < count; i++) {
+				if (items[i].stream == nullptr && items[i].readable) {
+					stream = http2_ctx_accept_stream(ctx);
+					if (stream)
+						break;
+				}
+			}
+			usleep(20000);
+		}
+		ASSERT_NE(stream, nullptr) << "Server failed to accept stream";
+
+		// Read request and send response
+		uint8_t buf[1024];
+		http2_stream_read_body(stream, buf, sizeof(buf));
+		http2_stream_set_response(stream, 200, NULL, 0);
+		http2_stream_write_body(stream, (const uint8_t *)"OK", 2, 1);
+
+		http2_stream_close(stream);
+		http2_ctx_close(ctx);
+	});
+
+	std::thread client_thread([this]() {
+		usleep(50000);
+		struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
+		ASSERT_NE(ctx, nullptr);
+
+		// Handshake
+		int handshake_attempts = 200;
+		int ret = 0;
+		while (handshake_attempts-- > 0) {
+			struct pollfd pfd = {client_sock, POLLIN, 0};
+			poll(&pfd, 1, 10);
+			ret = http2_ctx_handshake(ctx);
+			if (ret == 1)
+				break;
+			if (ret < 0)
+				break;
+		}
+		ASSERT_EQ(ret, 1) << "Client handshake failed";
+
+		// Create stream
+		struct http2_stream *stream = http2_stream_new(ctx);
+		ASSERT_NE(stream, nullptr);
+
+		// Send request
+		http2_stream_set_request(stream, "GET", "/test", NULL);
+		http2_stream_write_body(stream, NULL, 0, 1);
+
+		// Wait for response
+		int max_attempts = 200;
+		while (max_attempts-- > 0) {
+			struct pollfd pfd = {client_sock, POLLIN, 0};
+			poll(&pfd, 1, 100);
+			struct http2_poll_item items[10];
+			int count = 0;
+			http2_ctx_poll(ctx, items, 10, &count);
+			if (http2_stream_get_status(stream) > 0)
+				break;
+			usleep(20000);
+		}
+
+		// Close the stream explicitly
+		http2_stream_get(stream); // Keep reference for reading after close
+		http2_stream_close(stream);
+
+		// Verify stream is marked as closed (should still be able to read)
+		// After close, the stream should still be readable until all data is consumed
+		EXPECT_FALSE(http2_stream_is_end(stream)); // Should not be end yet since we haven't read response
+
+		// Read response (should still work after close)
+		uint8_t buf[1024];
+		int read_len = http2_stream_read_body(stream, buf, sizeof(buf));
+		EXPECT_GE(read_len, 0); // Should be able to read
+
+		// After reading all data, stream should be end
+		while (!http2_stream_is_end(stream)) {
+			read_len = http2_stream_read_body(stream, buf, sizeof(buf));
+			if (read_len <= 0) {
+				break;
+			}
+		}
+		EXPECT_TRUE(http2_stream_is_end(stream)); // Should be end after reading all data
 
-		usleep(200000); // Wait for server to process
-		
 		http2_stream_put(stream);
 		http2_ctx_put(ctx);
 	});
@@ -614,3 +744,206 @@ TEST_F(LIBHTTP2, ServerLoopTerminationOnDisconnect)
 	server_thread.join();
 	client_thread.join();
 }
+
+TEST_F(LIBHTTP2, ReferenceCountingNormal)
+{
+	// Test normal reference counting: ctx normal, stream released by business
+	struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
+	ASSERT_NE(ctx, nullptr);
+
+	// Create a stream (already has refcount = 1)
+	struct http2_stream *stream = http2_stream_new(ctx);
+	ASSERT_NE(stream, nullptr);
+
+	// Close context (should not free stream because business still holds reference)
+	http2_ctx_close(ctx);
+
+	// Business releases reference
+	http2_stream_close(stream);
+
+	// Now stream should be freed
+	// We can't directly check, but no crash should occur
+}
+
+TEST_F(LIBHTTP2, ReferenceCountingContextError)
+{
+	// Test reference counting when ctx has error but stream is still referenced by business
+	struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
+	ASSERT_NE(ctx, nullptr);
+
+	// Create a stream
+	struct http2_stream *stream = http2_stream_new(ctx);
+	ASSERT_NE(stream, nullptr);
+
+	// Simulate context error by closing the socket (connection broken)
+	close(client_sock);
+	client_sock = -1;
+
+	// Close context (should handle error gracefully)
+	http2_ctx_close(ctx);
+
+	// Business still holds reference, should be able to release it
+	http2_stream_close(stream);
+
+	// No crash should occur
+}
+
+TEST_F(LIBHTTP2, StressTest)
+{
+	const int NUM_STREAMS = 1024;
+	std::atomic<int> server_processed(0);
+	std::atomic<int> client_completed(0);
+	std::atomic<bool> test_completed(false);
+
+	std::thread server_thread([this, NUM_STREAMS, &server_processed, &test_completed]() {
+		struct http2_ctx *ctx = http2_ctx_server_new("test-server", bio_read, bio_write, &server_sock, NULL);
+		ASSERT_NE(ctx, nullptr);
+
+		// Handshake
+		auto start_time = std::chrono::steady_clock::now();
+		int ret = 0;
+		while (std::chrono::steady_clock::now() - start_time < std::chrono::seconds(5)) {
+			struct pollfd pfd = {server_sock, POLLIN, 0};
+			int poll_ret = poll(&pfd, 1, 10);
+			if (poll_ret == 0) {
+				continue;
+			}
+			ret = http2_ctx_handshake(ctx);
+			if (ret == 1)
+				break;
+			if (ret < 0)
+				break;
+		}
+		ASSERT_EQ(ret, 1) << "Server handshake failed";
+
+		std::vector<struct http2_stream *> streams;
+		start_time = std::chrono::steady_clock::now();
+		while (!test_completed && std::chrono::steady_clock::now() - start_time < std::chrono::seconds(30)) {
+			struct pollfd pfd = {server_sock, POLLIN, 0};
+			poll(&pfd, 1, 10);
+
+			struct http2_poll_item items[64];
+			int count = 0;
+			http2_ctx_poll(ctx, items, 64, &count);
+
+			for (int i = 0; i < count; i++) {
+				if (items[i].stream == nullptr && items[i].readable) {
+					struct http2_stream *stream = http2_ctx_accept_stream(ctx);
+					if (stream) {
+						streams.push_back(stream);
+					}
+				} else if (items[i].stream && items[i].readable) {
+					struct http2_stream *stream = items[i].stream;
+					uint8_t buf[1024];
+					while (http2_stream_read_body(stream, buf, sizeof(buf)) > 0)
+						;
+
+					if (http2_stream_is_end(stream)) {
+						char response[256];
+						int response_len = snprintf(response, sizeof(response), "Echo %d", http2_stream_get_id(stream));
+						char content_length[32];
+						snprintf(content_length, sizeof(content_length), "%d", response_len);
+						struct http2_header_pair headers[] = {{"content-type", "text/plain"},
+															  {"content-length", content_length}};
+						http2_stream_set_response(stream, 200, headers, 2);
+						http2_stream_write_body(stream, (const uint8_t *)response, response_len, 1);
+						server_processed++;
+					}
+				}
+			}
+		}
+
+		for (auto stream : streams) {
+			http2_stream_close(stream);
+		}
+		http2_ctx_close(ctx);
+	});
+
+	std::thread client_thread([this, NUM_STREAMS, &client_completed, &test_completed]() {
+		usleep(50000);
+		struct http2_ctx *ctx = http2_ctx_client_new("test-client", bio_read, bio_write, &client_sock, NULL);
+		ASSERT_NE(ctx, nullptr);
+
+		// Handshake
+		auto start_time = std::chrono::steady_clock::now();
+		int ret = 0;
+		while (std::chrono::steady_clock::now() - start_time < std::chrono::seconds(5)) {
+			struct pollfd pfd = {client_sock, POLLIN, 0};
+			poll(&pfd, 1, 10);
+			ret = http2_ctx_handshake(ctx);
+			if (ret == 1)
+				break;
+			if (ret < 0)
+				break;
+		}
+		ASSERT_EQ(ret, 1) << "Client handshake failed";
+
+		std::vector<struct http2_stream *> streams;
+		streams.reserve(NUM_STREAMS);
+		std::set<int> completed_ids;
+
+		auto process_events = [&](int timeout_ms) {
+			struct pollfd pfd = {client_sock, POLLIN, 0};
+			poll(&pfd, 1, timeout_ms);
+
+			struct http2_poll_item items[64];
+			int count = 0;
+			http2_ctx_poll(ctx, items, 64, &count);
+
+			for (int i = 0; i < count; i++) {
+				if (items[i].stream && items[i].readable) {
+					struct http2_stream *stream = items[i].stream;
+					uint8_t buf[1024];
+					while (http2_stream_read_body(stream, buf, sizeof(buf)) > 0)
+						;
+
+					if (http2_stream_is_end(stream)) {
+						int id = http2_stream_get_id(stream);
+						if (completed_ids.find(id) == completed_ids.end()) {
+							completed_ids.insert(id);
+							client_completed++;
+						}
+					}
+				}
+			}
+		};
+
+		for (int i = 0; i < NUM_STREAMS; i++) {
+			struct http2_stream *stream = http2_stream_new(ctx);
+			if (stream) {
+				streams.push_back(stream);
+				char path[64];
+				snprintf(path, sizeof(path), "/stream%d", i);
+				char body[64];
+				int body_len = snprintf(body, sizeof(body), "Req %d", i);
+
+				struct http2_header_pair headers[] = {{"content-type", "text/plain"}, {NULL, NULL}};
+				http2_stream_set_request(stream, "POST", path, headers);
+				http2_stream_write_body(stream, (const uint8_t *)body, body_len, 1);
+			}
+
+			// Process events periodically to prevent deadlock/buffer overflow
+			if (i % 10 == 0) {
+				process_events(0);
+			}
+		}
+		ASSERT_EQ(streams.size(), NUM_STREAMS);
+
+		start_time = std::chrono::steady_clock::now();
+		while (client_completed < NUM_STREAMS &&
+			   std::chrono::steady_clock::now() - start_time < std::chrono::seconds(30)) {
+			process_events(10);
+		}
+
+		EXPECT_EQ(client_completed, NUM_STREAMS);
+
+		for (auto stream : streams) {
+			http2_stream_close(stream);
+		}
+		http2_ctx_close(ctx);
+		test_completed = true;
+	});
+
+	server_thread.join();
+	client_thread.join();
+}