delay_manager.cpp 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. /*
  2. * delay_manager.cpp
  3. *
  4. * Created on: Sep 15, 2017
  5. * Author: root
  6. */
  7. #include "delay_manager.h"
  8. #include "log.h"
  9. #include "packet.h"
  10. int delay_data_t::handle()
  11. {
  12. return my_send(dest,data,len)>=0;
  13. }
  14. delay_manager_t::delay_manager_t()
  15. {
  16. capacity=0;
  17. if ((timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
  18. {
  19. mylog(log_fatal,"timer_fd create error");
  20. myexit(1);
  21. }
  22. itimerspec zero_its;
  23. memset(&zero_its, 0, sizeof(zero_its));
  24. timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &zero_its, 0);
  25. }
  26. delay_manager_t::~delay_manager_t()
  27. {
  28. //TODO ,we currently dont need to deconstruct it
  29. }
  30. int delay_manager_t::get_timer_fd()
  31. {
  32. return timer_fd;
  33. }
  34. //int add(my_time_t delay,const dest_t &dest,const char *data,int len);
  35. int delay_manager_t::add(my_time_t delay,const dest_t &dest,char *data,int len)
  36. {
  37. delay_data_t delay_data;
  38. delay_data.dest=dest;
  39. //delay_data.data=data;
  40. delay_data.len=len;
  41. if(capacity!=0&&int(delay_mp.size()) >=capacity)
  42. {
  43. mylog(log_warn,"max pending packet reached,ignored\n");
  44. return -1;
  45. }
  46. if(delay==0)
  47. {
  48. static char buf[buf_len];
  49. delay_data.data=buf;
  50. memcpy(buf,data,len);
  51. int ret=delay_data.handle();
  52. if (ret != 0) {
  53. mylog(log_trace, "handle() return %d\n", ret);
  54. }
  55. return 0;
  56. }
  57. delay_data_t tmp=delay_data;
  58. tmp.data=(char *)malloc(delay_data.len+100);
  59. if(!tmp.data)
  60. {
  61. mylog(log_warn, "malloc() returned null in delay_manager_t::add()");
  62. return -1;
  63. }
  64. memcpy(tmp.data,data,delay_data.len);
  65. my_time_t tmp_time=get_current_time_us();
  66. tmp_time+=delay;
  67. delay_mp.insert(make_pair(tmp_time,tmp));
  68. return 0;
  69. }
  70. int delay_manager_t::check()
  71. {
  72. if(!delay_mp.empty())
  73. {
  74. my_time_t current_time;
  75. multimap<my_time_t,delay_data_t>::iterator it;
  76. while(1)
  77. {
  78. int ret=0;
  79. it=delay_mp.begin();
  80. if(it==delay_mp.end()) break;
  81. current_time=get_current_time_us();
  82. if(it->first <= current_time)
  83. {
  84. ret=it->second.handle();
  85. if (ret != 0) {
  86. mylog(log_trace, "handle() return %d\n", ret);
  87. }
  88. free(it->second.data);
  89. delay_mp.erase(it);
  90. }
  91. else
  92. {
  93. break;
  94. }
  95. }
  96. if(!delay_mp.empty())
  97. {
  98. itimerspec its;
  99. memset(&its.it_interval,0,sizeof(its.it_interval));
  100. its.it_value.tv_sec=delay_mp.begin()->first/1000000llu;
  101. its.it_value.tv_nsec=(delay_mp.begin()->first%1000000llu)*1000llu;
  102. timerfd_settime(timer_fd,TFD_TIMER_ABSTIME,&its,0);
  103. }
  104. }
  105. return 0;
  106. }