Browse Source

dns_server: multi-threading

Nick Peng 2 years ago
parent
commit
b1c4ad7afb
1 changed files with 98 additions and 4 deletions
  1. 98 4
      src/dns_server.c

+ 98 - 4
src/dns_server.c

@@ -62,6 +62,7 @@
 #define CACHE_AUTO_ENABLE_SIZE (1024 * 1024 * 128)
 #define EXPIRED_DOMAIN_PREFETCH_TIME (3600 * 8)
 #define DNS_MAX_DOMAIN_REFETCH_NUM 16
+#define DNS_WORKER_NUM 4
 
 #define RECV_ERROR_AGAIN 1
 #define RECV_ERROR_OK 0
@@ -252,9 +253,26 @@ struct dns_request {
 	struct dns_request_pending_list *request_pending_list;
 };
 
+struct dns_server_work_event {
+	struct list_head list;
+	struct dns_server_conn_head *conn;
+	unsigned char inpacket[DNS_IN_PACKSIZE];
+	int inpacket_len;
+	struct sockaddr_storage local;
+	socklen_t local_len;
+	struct sockaddr_storage from;
+	socklen_t from_len;
+};
+
 /* dns server data */
 struct dns_server {
 	atomic_t run;
+
+	pthread_t worker[DNS_WORKER_NUM];
+	pthread_mutex_t worker_notify_lock;
+	pthread_cond_t worker_notify_cond;
+	struct list_head work_list;
+
 	int epoll_fd;
 	int event_fd;
 	struct list_head conn_list;
@@ -4488,9 +4506,9 @@ errout:
 	return -1;
 }
 
-static int _dns_server_recv(struct dns_server_conn_head *conn, unsigned char *inpacket, int inpacket_len,
-							struct sockaddr_storage *local, socklen_t local_len, struct sockaddr_storage *from,
-							socklen_t from_len)
+static int _dns_server_process_work(struct dns_server_conn_head *conn, unsigned char *inpacket, int inpacket_len,
+									struct sockaddr_storage *local, socklen_t local_len, struct sockaddr_storage *from,
+									socklen_t from_len)
 {
 	int decode_len = 0;
 	int ret = -1;
@@ -4553,6 +4571,34 @@ errout:
 	return ret;
 }
 
+static int _dns_server_recv(struct dns_server_conn_head *conn, unsigned char *inpacket, int inpacket_len,
+							struct sockaddr_storage *local, socklen_t local_len, struct sockaddr_storage *from,
+							socklen_t from_len)
+{
+	struct dns_server_work_event *event = NULL;
+	event = (struct dns_server_work_event *)malloc(sizeof(struct dns_server_work_event));
+	if (event == NULL) {
+		tlog(TLOG_ERROR, "malloc failed.\n");
+		return -1;
+	}
+	memset(event, 0, sizeof(struct dns_server_work_event));
+	event->conn = conn;
+	memcpy(event->inpacket, inpacket, inpacket_len);
+	event->inpacket_len = inpacket_len;
+	memcpy(&event->local, local, local_len);
+	event->local_len = local_len;
+	memcpy(&event->from, from, from_len);
+	event->from_len = from_len;
+	INIT_LIST_HEAD(&event->list);
+
+	pthread_mutex_lock(&server.worker_notify_lock);
+	list_add_tail(&event->list, &server.work_list);
+	pthread_mutex_unlock(&server.worker_notify_lock);
+	pthread_cond_signal(&server.worker_notify_cond);
+
+	return 0;
+}
+
 static int _dns_server_setup_server_query_options(struct dns_request *request,
 												  struct dns_server_query_option *server_query_option)
 {
@@ -4774,7 +4820,7 @@ static int _dns_server_tcp_recv(struct dns_server_conn_tcp_client *tcpclient)
 			if (errno == EAGAIN) {
 				return RECV_ERROR_AGAIN;
 			}
-			
+
 			if (errno == ECONNRESET) {
 				return RECV_ERROR_CLOSE;
 			}
@@ -5198,6 +5244,34 @@ static void _dns_server_close_socket_server(void)
 	}
 }
 
+static void *_dns_server_worker(void *args)
+{
+	struct dns_server_work_event *work_event = NULL;
+
+	while (atomic_read(&server.run)) {
+		pthread_mutex_lock(&server.worker_notify_lock);
+		if (list_empty(&server.work_list)) {
+			pthread_cond_wait(&server.worker_notify_cond, &server.worker_notify_lock);
+		}
+
+		work_event = list_first_entry_or_null(&server.work_list, struct dns_server_work_event, list);
+		if (work_event) {
+			list_del_init(&work_event->list);
+		}
+		pthread_mutex_unlock(&server.worker_notify_lock);
+
+		if (work_event == NULL) {
+			continue;
+		}
+
+		_dns_server_process_work(work_event->conn, work_event->inpacket, work_event->inpacket_len, &work_event->local,
+								 work_event->local_len, &work_event->from, work_event->from_len);
+		free(work_event);
+	}
+
+	return NULL;
+}
+
 int dns_server_run(void)
 {
 	struct epoll_event events[DNS_MAX_EVENTS + 1];
@@ -5682,7 +5756,10 @@ int dns_server_init(void)
 	}
 
 	pthread_mutex_init(&server.request_list_lock, NULL);
+	pthread_mutex_init(&server.worker_notify_lock, NULL);
+	pthread_cond_init(&server.worker_notify_cond, NULL);
 	INIT_LIST_HEAD(&server.request_list);
+	INIT_LIST_HEAD(&server.work_list);
 	server.epoll_fd = epollfd;
 	atomic_set(&server.run, 1);
 
@@ -5700,6 +5777,14 @@ int dns_server_init(void)
 		goto errout;
 	}
 
+	for (int i = 0; i < DNS_WORKER_NUM; i++) {
+		ret = pthread_create(&server.worker[i], &attr, _dns_server_worker, NULL);
+		if (ret != 0) {
+			tlog(TLOG_ERROR, "create server work thread failed, %s\n", strerror(ret));
+			goto errout;
+		}
+	}
+
 	return 0;
 errout:
 	atomic_set(&server.run, 0);
@@ -5728,6 +5813,15 @@ void dns_server_exit(void)
 		close(server.event_fd);
 		server.event_fd = -1;
 	}
+
+	for (int i = 0; i < DNS_WORKER_NUM; i++) {
+		if (server.worker[i]) {
+			pthread_cond_broadcast(&server.worker_notify_cond);
+			pthread_join(server.worker[i], NULL);
+			server.worker[i] = 0;
+		}
+	}
+
 	_dns_server_close_socket();
 	_dns_server_cache_save();
 	_dns_server_request_remove_all();