1
0

task.c 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #include "task.h"
  2. #include "bmem.h"
  3. #include "threading.h"
  4. #include "deque.h"
  5. struct os_task_queue {
  6. pthread_t thread;
  7. os_sem_t *sem;
  8. long id;
  9. bool waiting;
  10. bool tasks_processed;
  11. os_event_t *wait_event;
  12. pthread_mutex_t mutex;
  13. struct deque tasks;
  14. };
  15. struct os_task_info {
  16. os_task_t task;
  17. void *param;
  18. };
  19. static THREAD_LOCAL bool exit_thread = false;
  20. static THREAD_LOCAL long thread_id = 0;
  21. static volatile long thread_id_counter = 1;
  22. static void *tiny_tubular_task_thread(void *param);
  23. os_task_queue_t *os_task_queue_create(void)
  24. {
  25. struct os_task_queue *tq = bzalloc(sizeof(*tq));
  26. tq->id = os_atomic_inc_long(&thread_id_counter);
  27. if (pthread_mutex_init(&tq->mutex, NULL) != 0)
  28. goto fail1;
  29. if (os_sem_init(&tq->sem, 0) != 0)
  30. goto fail2;
  31. if (os_event_init(&tq->wait_event, OS_EVENT_TYPE_AUTO) != 0)
  32. goto fail3;
  33. if (pthread_create(&tq->thread, NULL, tiny_tubular_task_thread, tq) !=
  34. 0)
  35. goto fail4;
  36. return tq;
  37. fail4:
  38. os_event_destroy(tq->wait_event);
  39. fail3:
  40. os_sem_destroy(tq->sem);
  41. fail2:
  42. pthread_mutex_destroy(&tq->mutex);
  43. fail1:
  44. bfree(tq);
  45. return NULL;
  46. }
  47. bool os_task_queue_queue_task(os_task_queue_t *tq, os_task_t task, void *param)
  48. {
  49. struct os_task_info ti = {
  50. task,
  51. param,
  52. };
  53. if (!tq)
  54. return false;
  55. pthread_mutex_lock(&tq->mutex);
  56. deque_push_back(&tq->tasks, &ti, sizeof(ti));
  57. pthread_mutex_unlock(&tq->mutex);
  58. os_sem_post(tq->sem);
  59. return true;
  60. }
  61. static void wait_for_thread(void *data)
  62. {
  63. os_task_queue_t *tq = data;
  64. os_event_signal(tq->wait_event);
  65. }
  66. static void stop_thread(void *unused)
  67. {
  68. exit_thread = true;
  69. UNUSED_PARAMETER(unused);
  70. }
  71. void os_task_queue_destroy(os_task_queue_t *tq)
  72. {
  73. if (!tq)
  74. return;
  75. os_task_queue_queue_task(tq, stop_thread, NULL);
  76. pthread_join(tq->thread, NULL);
  77. os_event_destroy(tq->wait_event);
  78. os_sem_destroy(tq->sem);
  79. pthread_mutex_destroy(&tq->mutex);
  80. deque_free(&tq->tasks);
  81. bfree(tq);
  82. }
  83. bool os_task_queue_wait(os_task_queue_t *tq)
  84. {
  85. if (!tq)
  86. return false;
  87. struct os_task_info ti = {
  88. wait_for_thread,
  89. tq,
  90. };
  91. pthread_mutex_lock(&tq->mutex);
  92. tq->waiting = true;
  93. tq->tasks_processed = false;
  94. deque_push_back(&tq->tasks, &ti, sizeof(ti));
  95. pthread_mutex_unlock(&tq->mutex);
  96. os_sem_post(tq->sem);
  97. os_event_wait(tq->wait_event);
  98. pthread_mutex_lock(&tq->mutex);
  99. bool tasks_processed = tq->tasks_processed;
  100. pthread_mutex_unlock(&tq->mutex);
  101. return tasks_processed;
  102. }
  103. bool os_task_queue_inside(os_task_queue_t *tq)
  104. {
  105. return tq->id == thread_id;
  106. }
  107. static void *tiny_tubular_task_thread(void *param)
  108. {
  109. struct os_task_queue *tq = param;
  110. thread_id = tq->id;
  111. os_set_thread_name(__FUNCTION__);
  112. while (!exit_thread && os_sem_wait(tq->sem) == 0) {
  113. struct os_task_info ti;
  114. pthread_mutex_lock(&tq->mutex);
  115. deque_pop_front(&tq->tasks, &ti, sizeof(ti));
  116. if (tq->tasks.size && ti.task == wait_for_thread) {
  117. deque_push_back(&tq->tasks, &ti, sizeof(ti));
  118. deque_pop_front(&tq->tasks, &ti, sizeof(ti));
  119. }
  120. if (tq->tasks.size && ti.task == stop_thread) {
  121. deque_push_back(&tq->tasks, &ti, sizeof(ti));
  122. deque_pop_front(&tq->tasks, &ti, sizeof(ti));
  123. }
  124. if (tq->waiting) {
  125. if (ti.task == wait_for_thread) {
  126. tq->waiting = false;
  127. } else {
  128. tq->tasks_processed = true;
  129. }
  130. }
  131. pthread_mutex_unlock(&tq->mutex);
  132. ti.task(ti.param);
  133. }
  134. return NULL;
  135. }