task.c 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. #include "task.h"
  2. #include "bmem.h"
  3. #include "threading.h"
  4. #include "deque.h"
  5. struct os_task_queue {
  6. pthread_t thread;
  7. os_sem_t *sem;
  8. long id;
  9. bool waiting;
  10. bool tasks_processed;
  11. os_event_t *wait_event;
  12. pthread_mutex_t mutex;
  13. struct deque tasks;
  14. };
  15. struct os_task_info {
  16. os_task_t task;
  17. void *param;
  18. };
  19. static THREAD_LOCAL bool exit_thread = false;
  20. static THREAD_LOCAL long thread_id = 0;
  21. static volatile long thread_id_counter = 1;
  22. static void *tiny_tubular_task_thread(void *param);
  23. os_task_queue_t *os_task_queue_create(void)
  24. {
  25. struct os_task_queue *tq = bzalloc(sizeof(*tq));
  26. tq->id = os_atomic_inc_long(&thread_id_counter);
  27. if (pthread_mutex_init(&tq->mutex, NULL) != 0)
  28. goto fail1;
  29. if (os_sem_init(&tq->sem, 0) != 0)
  30. goto fail2;
  31. if (os_event_init(&tq->wait_event, OS_EVENT_TYPE_AUTO) != 0)
  32. goto fail3;
  33. if (pthread_create(&tq->thread, NULL, tiny_tubular_task_thread, tq) != 0)
  34. goto fail4;
  35. return tq;
  36. fail4:
  37. os_event_destroy(tq->wait_event);
  38. fail3:
  39. os_sem_destroy(tq->sem);
  40. fail2:
  41. pthread_mutex_destroy(&tq->mutex);
  42. fail1:
  43. bfree(tq);
  44. return NULL;
  45. }
  46. bool os_task_queue_queue_task(os_task_queue_t *tq, os_task_t task, void *param)
  47. {
  48. struct os_task_info ti = {
  49. task,
  50. param,
  51. };
  52. if (!tq)
  53. return false;
  54. pthread_mutex_lock(&tq->mutex);
  55. deque_push_back(&tq->tasks, &ti, sizeof(ti));
  56. pthread_mutex_unlock(&tq->mutex);
  57. os_sem_post(tq->sem);
  58. return true;
  59. }
  60. static void wait_for_thread(void *data)
  61. {
  62. os_task_queue_t *tq = data;
  63. os_event_signal(tq->wait_event);
  64. }
  65. static void stop_thread(void *unused)
  66. {
  67. exit_thread = true;
  68. UNUSED_PARAMETER(unused);
  69. }
  70. void os_task_queue_destroy(os_task_queue_t *tq)
  71. {
  72. if (!tq)
  73. return;
  74. os_task_queue_queue_task(tq, stop_thread, NULL);
  75. pthread_join(tq->thread, NULL);
  76. os_event_destroy(tq->wait_event);
  77. os_sem_destroy(tq->sem);
  78. pthread_mutex_destroy(&tq->mutex);
  79. deque_free(&tq->tasks);
  80. bfree(tq);
  81. }
  82. bool os_task_queue_wait(os_task_queue_t *tq)
  83. {
  84. if (!tq)
  85. return false;
  86. struct os_task_info ti = {
  87. wait_for_thread,
  88. tq,
  89. };
  90. pthread_mutex_lock(&tq->mutex);
  91. tq->waiting = true;
  92. tq->tasks_processed = false;
  93. deque_push_back(&tq->tasks, &ti, sizeof(ti));
  94. pthread_mutex_unlock(&tq->mutex);
  95. os_sem_post(tq->sem);
  96. os_event_wait(tq->wait_event);
  97. pthread_mutex_lock(&tq->mutex);
  98. bool tasks_processed = tq->tasks_processed;
  99. pthread_mutex_unlock(&tq->mutex);
  100. return tasks_processed;
  101. }
  102. bool os_task_queue_inside(os_task_queue_t *tq)
  103. {
  104. return tq->id == thread_id;
  105. }
  106. static void *tiny_tubular_task_thread(void *param)
  107. {
  108. struct os_task_queue *tq = param;
  109. thread_id = tq->id;
  110. os_set_thread_name(__FUNCTION__);
  111. while (!exit_thread && os_sem_wait(tq->sem) == 0) {
  112. struct os_task_info ti;
  113. pthread_mutex_lock(&tq->mutex);
  114. deque_pop_front(&tq->tasks, &ti, sizeof(ti));
  115. if (tq->tasks.size && ti.task == wait_for_thread) {
  116. deque_push_back(&tq->tasks, &ti, sizeof(ti));
  117. deque_pop_front(&tq->tasks, &ti, sizeof(ti));
  118. }
  119. if (tq->tasks.size && ti.task == stop_thread) {
  120. deque_push_back(&tq->tasks, &ti, sizeof(ti));
  121. deque_pop_front(&tq->tasks, &ti, sizeof(ti));
  122. }
  123. if (tq->waiting) {
  124. if (ti.task == wait_for_thread) {
  125. tq->waiting = false;
  126. } else {
  127. tq->tasks_processed = true;
  128. }
  129. }
  130. pthread_mutex_unlock(&tq->mutex);
  131. ti.task(ti.param);
  132. }
  133. return NULL;
  134. }