Sfoglia il codice sorgente

ping: Optimize the ping callback thread to reduce inaccurate results caused by blocking

Nick Peng 2 anni fa
parent
commit
f619ca8f68
1 ha cambiato i file con 159 aggiunte e 17 eliminazioni
  1. 159 17
      src/fast_ping.c

+ 159 - 17
src/fast_ping.c

@@ -19,6 +19,7 @@
 #include "fast_ping.h"
 #include "fast_ping.h"
 #include "atomic.h"
 #include "atomic.h"
 #include "hashtable.h"
 #include "hashtable.h"
+#include "list.h"
 #include "tlog.h"
 #include "tlog.h"
 #include "util.h"
 #include "util.h"
 #include <arpa/inet.h>
 #include <arpa/inet.h>
@@ -37,6 +38,7 @@
 #include <string.h>
 #include <string.h>
 #include <sys/epoll.h>
 #include <sys/epoll.h>
 #include <sys/eventfd.h>
 #include <sys/eventfd.h>
+#include <sys/resource.h>
 #include <sys/socket.h>
 #include <sys/socket.h>
 #include <sys/types.h>
 #include <sys/types.h>
 #include <unistd.h>
 #include <unistd.h>
@@ -127,6 +129,15 @@ struct ping_host_struct {
 	struct fast_ping_packet packet;
 	struct fast_ping_packet packet;
 };
 };
 
 
+struct fast_ping_notify_event {
+	struct list_head list;
+	struct ping_host_struct *ping_host;
+	FAST_PING_RESULT ping_result;
+	unsigned int seq;
+	int ttl;
+	struct timeval tvresult;
+};
+
 struct fast_ping_struct {
 struct fast_ping_struct {
 	atomic_t run;
 	atomic_t run;
 	pthread_t tid;
 	pthread_t tid;
@@ -145,6 +156,10 @@ struct fast_ping_struct {
 	struct ping_host_struct udp6_host;
 	struct ping_host_struct udp6_host;
 
 
 	int event_fd;
 	int event_fd;
+	pthread_t notify_tid;
+	pthread_cond_t notify_cond;
+	pthread_mutex_t notify_lock;
+	struct list_head notify_event_list;
 
 
 	pthread_mutex_t map_lock;
 	pthread_mutex_t map_lock;
 	DECLARE_HASHTABLE(addrmap, 6);
 	DECLARE_HASHTABLE(addrmap, 6);
@@ -154,6 +169,8 @@ static struct fast_ping_struct ping;
 static atomic_t ping_sid = ATOMIC_INIT(0);
 static atomic_t ping_sid = ATOMIC_INIT(0);
 static int bool_print_log = 1;
 static int bool_print_log = 1;
 
 
+static void _fast_ping_host_put(struct ping_host_struct *ping_host);
+
 static void _fast_ping_wakup_thread(void)
 static void _fast_ping_wakup_thread(void)
 {
 {
 	uint64_t u = 1;
 	uint64_t u = 1;
@@ -378,6 +395,53 @@ static void _fast_ping_close_host_sock(struct ping_host_struct *ping_host)
 	ping_host->fd = -1;
 	ping_host->fd = -1;
 }
 }
 
 
+static void _fast_ping_release_notify_event(struct fast_ping_notify_event *ping_notify_event)
+{
+	pthread_mutex_lock(&ping.notify_lock);
+	list_del_init(&ping_notify_event->list);
+	pthread_mutex_unlock(&ping.notify_lock);
+
+	if (ping_notify_event->ping_host) {
+		_fast_ping_host_put(ping_notify_event->ping_host);
+		ping_notify_event->ping_host = NULL;
+	}
+	free(ping_notify_event);
+}
+
+static int _fast_ping_send_notify_event(struct ping_host_struct *ping_host, FAST_PING_RESULT ping_result,
+										unsigned int seq, int ttl, struct timeval *tvresult)
+{
+	struct fast_ping_notify_event *notify_event = NULL;
+
+	notify_event = malloc(sizeof(struct fast_ping_notify_event));
+	if (notify_event == NULL) {
+		goto errout;
+	}
+	memset(notify_event, 0, sizeof(struct fast_ping_notify_event));
+	INIT_LIST_HEAD(&notify_event->list);
+	notify_event->seq = seq;
+	notify_event->ttl = ttl;
+	notify_event->ping_result = ping_result;
+	notify_event->tvresult = *tvresult;
+
+	pthread_mutex_lock(&ping.notify_lock);
+	if (list_empty(&ping.notify_event_list)) {
+		pthread_cond_signal(&ping.notify_cond);
+	}
+	list_add_tail(&notify_event->list, &ping.notify_event_list);
+	notify_event->ping_host = ping_host;
+	_fast_ping_host_get(ping_host);
+	pthread_mutex_unlock(&ping.notify_lock);
+
+	return 0;
+
+errout:
+	if (notify_event) {
+		_fast_ping_release_notify_event(notify_event);
+	}
+	return -1;
+}
+
 static void _fast_ping_host_put(struct ping_host_struct *ping_host)
 static void _fast_ping_host_put(struct ping_host_struct *ping_host)
 {
 {
 	int ref_cnt = atomic_dec_and_test(&ping_host->ref);
 	int ref_cnt = atomic_dec_and_test(&ping_host->ref);
@@ -400,8 +464,7 @@ static void _fast_ping_host_put(struct ping_host_struct *ping_host)
 		tv.tv_sec = 0;
 		tv.tv_sec = 0;
 		tv.tv_usec = 0;
 		tv.tv_usec = 0;
 
 
-		ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_END, &ping_host->addr, ping_host->addr_len,
-								 ping_host->seq, ping_host->ttl, &tv, ping_host->error, ping_host->userptr);
+		_fast_ping_send_notify_event(ping_host, PING_RESULT_END, ping_host->seq, ping_host->ttl, &tv);
 	}
 	}
 
 
 	tlog(TLOG_DEBUG, "ping %s end, id %d", ping_host->host, ping_host->sid);
 	tlog(TLOG_DEBUG, "ping %s end, id %d", ping_host->host, ping_host->sid);
@@ -427,8 +490,7 @@ static void _fast_ping_host_remove(struct ping_host_struct *ping_host)
 		tv.tv_sec = 0;
 		tv.tv_sec = 0;
 		tv.tv_usec = 0;
 		tv.tv_usec = 0;
 
 
-		ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_END, &ping_host->addr, ping_host->addr_len,
-								 ping_host->seq, ping_host->ttl, &tv, ping_host->error, ping_host->userptr);
+		_fast_ping_send_notify_event(ping_host, PING_RESULT_END, ping_host->seq, ping_host->ttl, &tv);
 	}
 	}
 
 
 	_fast_ping_host_put(ping_host);
 	_fast_ping_host_put(ping_host);
@@ -664,6 +726,8 @@ static int _fast_ping_sendping(struct ping_host_struct *ping_host)
 	if (ret != 0) {
 	if (ret != 0) {
 		ping_host->error = errno;
 		ping_host->error = errno;
 		return ret;
 		return ret;
+	} else {
+		ping_host->error = 0;
 	}
 	}
 
 
 	return 0;
 	return 0;
@@ -1374,9 +1438,8 @@ static int _fast_ping_process_icmp(struct ping_host_struct *ping_host, struct ti
 	recv_ping_host->ttl = packet->ttl;
 	recv_ping_host->ttl = packet->ttl;
 	tv_sub(&tvresult, tvsend);
 	tv_sub(&tvresult, tvsend);
 	if (recv_ping_host->ping_callback) {
 	if (recv_ping_host->ping_callback) {
-		recv_ping_host->ping_callback(recv_ping_host, recv_ping_host->host, PING_RESULT_RESPONSE, &recv_ping_host->addr,
-									  recv_ping_host->addr_len, recv_ping_host->seq, recv_ping_host->ttl, &tvresult,
-									  ping_host->error, recv_ping_host->userptr);
+		_fast_ping_send_notify_event(recv_ping_host, PING_RESULT_RESPONSE, recv_ping_host->seq, recv_ping_host->ttl,
+									 &tvresult);
 	}
 	}
 
 
 	recv_ping_host->send = 0;
 	recv_ping_host->send = 0;
@@ -1409,9 +1472,7 @@ static int _fast_ping_process_tcp(struct ping_host_struct *ping_host, struct epo
 	}
 	}
 	tv_sub(&tvresult, tvsend);
 	tv_sub(&tvresult, tvsend);
 	if (ping_host->ping_callback) {
 	if (ping_host->ping_callback) {
-		ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_RESPONSE, &ping_host->addr,
-								 ping_host->addr_len, ping_host->seq, ping_host->ttl, &tvresult, ping_host->error,
-								 ping_host->userptr);
+		_fast_ping_send_notify_event(ping_host, PING_RESULT_RESPONSE, ping_host->seq, ping_host->ttl, &tvresult);
 	}
 	}
 
 
 	ping_host->send = 0;
 	ping_host->send = 0;
@@ -1505,9 +1566,8 @@ static int _fast_ping_process_udp(struct ping_host_struct *ping_host, struct tim
 	tvsend = &recv_ping_host->last;
 	tvsend = &recv_ping_host->last;
 	tv_sub(&tvresult, tvsend);
 	tv_sub(&tvresult, tvsend);
 	if (recv_ping_host->ping_callback) {
 	if (recv_ping_host->ping_callback) {
-		recv_ping_host->ping_callback(recv_ping_host, recv_ping_host->host, PING_RESULT_RESPONSE, &recv_ping_host->addr,
-									  recv_ping_host->addr_len, recv_ping_host->seq, recv_ping_host->ttl, &tvresult,
-									  ping_host->error, recv_ping_host->userptr);
+		_fast_ping_send_notify_event(recv_ping_host, PING_RESULT_RESPONSE, recv_ping_host->seq, recv_ping_host->ttl,
+									 &tvresult);
 	}
 	}
 
 
 	recv_ping_host->send = 0;
 	recv_ping_host->send = 0;
@@ -1614,9 +1674,7 @@ static void _fast_ping_period_run(void)
 		tv_sub(&interval, &ping_host->last);
 		tv_sub(&interval, &ping_host->last);
 		millisecond = interval.tv_sec * 1000 + interval.tv_usec / 1000;
 		millisecond = interval.tv_sec * 1000 + interval.tv_usec / 1000;
 		if (millisecond >= ping_host->timeout && ping_host->send == 1) {
 		if (millisecond >= ping_host->timeout && ping_host->send == 1) {
-			ping_host->ping_callback(ping_host, ping_host->host, PING_RESULT_TIMEOUT, &ping_host->addr,
-									 ping_host->addr_len, ping_host->seq, ping_host->ttl, &interval, ping_host->error,
-									 ping_host->userptr);
+			_fast_ping_send_notify_event(ping_host, PING_RESULT_TIMEOUT, ping_host->seq, ping_host->ttl, &interval);
 			ping_host->send = 0;
 			ping_host->send = 0;
 		}
 		}
 
 
@@ -1642,6 +1700,56 @@ static void _fast_ping_period_run(void)
 	}
 	}
 }
 }
 
 
+static void _fast_ping_process_notify_event(struct fast_ping_notify_event *ping_notify_event)
+{
+	struct ping_host_struct *ping_host = ping_notify_event->ping_host;
+	if (ping_host == NULL) {
+		return;
+	}
+
+	ping_host->ping_callback(ping_host, ping_host->host, ping_notify_event->ping_result, &ping_host->addr,
+							 ping_host->addr_len, ping_notify_event->seq, ping_notify_event->ttl,
+							 &ping_notify_event->tvresult, ping_host->error, ping_host->userptr);
+}
+
+static void *_fast_ping_notify_worker(void *arg)
+{
+	struct fast_ping_notify_event *ping_notify_event = NULL;
+
+	while (atomic_read(&ping.run)) {
+		pthread_mutex_lock(&ping.notify_lock);
+		if (list_empty(&ping.notify_event_list)) {
+			pthread_cond_wait(&ping.notify_cond, &ping.notify_lock);
+		}
+
+		ping_notify_event = list_first_entry_or_null(&ping.notify_event_list, struct fast_ping_notify_event, list);
+		if (ping_notify_event) {
+			list_del_init(&ping_notify_event->list);
+		}
+		pthread_mutex_unlock(&ping.notify_lock);
+
+		if (ping_notify_event == NULL) {
+			continue;
+		}
+
+		_fast_ping_process_notify_event(ping_notify_event);
+		_fast_ping_release_notify_event(ping_notify_event);
+	}
+
+	return NULL;
+}
+
+static void _fast_ping_remove_all_notify_event(void)
+{
+	struct fast_ping_notify_event *notify_event = NULL;
+	struct fast_ping_notify_event *tmp = NULL;
+	list_for_each_entry_safe(notify_event, tmp, &ping.notify_event_list, list)
+	{
+		_fast_ping_process_notify_event(notify_event);
+		_fast_ping_release_notify_event(notify_event);
+	}
+}
+
 static void *_fast_ping_work(void *arg)
 static void *_fast_ping_work(void *arg)
 {
 {
 	struct epoll_event events[PING_MAX_EVENTS + 1];
 	struct epoll_event events[PING_MAX_EVENTS + 1];
@@ -1767,13 +1875,25 @@ int fast_ping_init(void)
 
 
 	pthread_mutex_init(&ping.map_lock, NULL);
 	pthread_mutex_init(&ping.map_lock, NULL);
 	pthread_mutex_init(&ping.lock, NULL);
 	pthread_mutex_init(&ping.lock, NULL);
+	pthread_mutex_init(&ping.notify_lock, NULL);
+	pthread_cond_init(&ping.notify_cond, NULL);
+
+	INIT_LIST_HEAD(&ping.notify_event_list);
+
 	hash_init(ping.addrmap);
 	hash_init(ping.addrmap);
 	ping.no_unprivileged_ping = !has_unprivileged_ping();
 	ping.no_unprivileged_ping = !has_unprivileged_ping();
 	ping.ident = (getpid() & 0XFFFF);
 	ping.ident = (getpid() & 0XFFFF);
 	atomic_set(&ping.run, 1);
 	atomic_set(&ping.run, 1);
+
 	ret = pthread_create(&ping.tid, &attr, _fast_ping_work, NULL);
 	ret = pthread_create(&ping.tid, &attr, _fast_ping_work, NULL);
 	if (ret != 0) {
 	if (ret != 0) {
-		tlog(TLOG_ERROR, "create ping work thread failed, %s\n", strerror(errno));
+		tlog(TLOG_ERROR, "create ping work thread failed, %s\n", strerror(ret));
+		goto errout;
+	}
+
+	ret = pthread_create(&ping.notify_tid, &attr, _fast_ping_notify_worker, NULL);
+	if (ret != 0) {
+		tlog(TLOG_ERROR, "create ping notifyer work thread failed, %s\n", strerror(ret));
 		goto errout;
 		goto errout;
 	}
 	}
 
 
@@ -1786,9 +1906,18 @@ int fast_ping_init(void)
 
 
 	return 0;
 	return 0;
 errout:
 errout:
+	if (ping.notify_tid) {
+		void *retval = NULL;
+		atomic_set(&ping.run, 0);
+		pthread_cond_signal(&ping.notify_cond);
+		pthread_join(ping.notify_tid, &retval);
+		ping.notify_tid = 0;
+	}
+
 	if (ping.tid) {
 	if (ping.tid) {
 		void *retval = NULL;
 		void *retval = NULL;
 		atomic_set(&ping.run, 0);
 		atomic_set(&ping.run, 0);
+		_fast_ping_wakup_thread();
 		pthread_join(ping.tid, &retval);
 		pthread_join(ping.tid, &retval);
 		ping.tid = 0;
 		ping.tid = 0;
 	}
 	}
@@ -1802,6 +1931,8 @@ errout:
 		ping.event_fd = -1;
 		ping.event_fd = -1;
 	}
 	}
 
 
+	pthread_cond_destroy(&ping.notify_cond);
+	pthread_mutex_destroy(&ping.notify_lock);
 	pthread_mutex_destroy(&ping.lock);
 	pthread_mutex_destroy(&ping.lock);
 	pthread_mutex_destroy(&ping.map_lock);
 	pthread_mutex_destroy(&ping.map_lock);
 	memset(&ping, 0, sizeof(ping));
 	memset(&ping, 0, sizeof(ping));
@@ -1834,6 +1965,14 @@ static void _fast_ping_close_fds(void)
 
 
 void fast_ping_exit(void)
 void fast_ping_exit(void)
 {
 {
+	if (ping.notify_tid) {
+		void *retval = NULL;
+		atomic_set(&ping.run, 0);
+		pthread_cond_signal(&ping.notify_cond);
+		pthread_join(ping.notify_tid, &retval);
+		ping.notify_tid = 0;
+	}
+
 	if (ping.tid) {
 	if (ping.tid) {
 		void *ret = NULL;
 		void *ret = NULL;
 		atomic_set(&ping.run, 0);
 		atomic_set(&ping.run, 0);
@@ -1849,7 +1988,10 @@ void fast_ping_exit(void)
 
 
 	_fast_ping_close_fds();
 	_fast_ping_close_fds();
 	_fast_ping_remove_all();
 	_fast_ping_remove_all();
+	_fast_ping_remove_all_notify_event();
 
 
+	pthread_cond_destroy(&ping.notify_cond);
+	pthread_mutex_destroy(&ping.notify_lock);
 	pthread_mutex_destroy(&ping.lock);
 	pthread_mutex_destroy(&ping.lock);
 	pthread_mutex_destroy(&ping.map_lock);
 	pthread_mutex_destroy(&ping.map_lock);
 }
 }