123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- #include "task.h"
- #include "bmem.h"
- #include "threading.h"
- #include "deque.h"
- struct os_task_queue {
- pthread_t thread;
- os_sem_t *sem;
- long id;
- bool waiting;
- bool tasks_processed;
- os_event_t *wait_event;
- pthread_mutex_t mutex;
- struct deque tasks;
- };
- struct os_task_info {
- os_task_t task;
- void *param;
- };
- static THREAD_LOCAL bool exit_thread = false;
- static THREAD_LOCAL long thread_id = 0;
- static volatile long thread_id_counter = 1;
- static void *tiny_tubular_task_thread(void *param);
- os_task_queue_t *os_task_queue_create(void)
- {
- struct os_task_queue *tq = bzalloc(sizeof(*tq));
- tq->id = os_atomic_inc_long(&thread_id_counter);
- if (pthread_mutex_init(&tq->mutex, NULL) != 0)
- goto fail1;
- if (os_sem_init(&tq->sem, 0) != 0)
- goto fail2;
- if (os_event_init(&tq->wait_event, OS_EVENT_TYPE_AUTO) != 0)
- goto fail3;
- if (pthread_create(&tq->thread, NULL, tiny_tubular_task_thread, tq) !=
- 0)
- goto fail4;
- return tq;
- fail4:
- os_event_destroy(tq->wait_event);
- fail3:
- os_sem_destroy(tq->sem);
- fail2:
- pthread_mutex_destroy(&tq->mutex);
- fail1:
- bfree(tq);
- return NULL;
- }
- bool os_task_queue_queue_task(os_task_queue_t *tq, os_task_t task, void *param)
- {
- struct os_task_info ti = {
- task,
- param,
- };
- if (!tq)
- return false;
- pthread_mutex_lock(&tq->mutex);
- deque_push_back(&tq->tasks, &ti, sizeof(ti));
- pthread_mutex_unlock(&tq->mutex);
- os_sem_post(tq->sem);
- return true;
- }
- static void wait_for_thread(void *data)
- {
- os_task_queue_t *tq = data;
- os_event_signal(tq->wait_event);
- }
- static void stop_thread(void *unused)
- {
- exit_thread = true;
- UNUSED_PARAMETER(unused);
- }
- void os_task_queue_destroy(os_task_queue_t *tq)
- {
- if (!tq)
- return;
- os_task_queue_queue_task(tq, stop_thread, NULL);
- pthread_join(tq->thread, NULL);
- os_event_destroy(tq->wait_event);
- os_sem_destroy(tq->sem);
- pthread_mutex_destroy(&tq->mutex);
- deque_free(&tq->tasks);
- bfree(tq);
- }
- bool os_task_queue_wait(os_task_queue_t *tq)
- {
- if (!tq)
- return false;
- struct os_task_info ti = {
- wait_for_thread,
- tq,
- };
- pthread_mutex_lock(&tq->mutex);
- tq->waiting = true;
- tq->tasks_processed = false;
- deque_push_back(&tq->tasks, &ti, sizeof(ti));
- pthread_mutex_unlock(&tq->mutex);
- os_sem_post(tq->sem);
- os_event_wait(tq->wait_event);
- pthread_mutex_lock(&tq->mutex);
- bool tasks_processed = tq->tasks_processed;
- pthread_mutex_unlock(&tq->mutex);
- return tasks_processed;
- }
- bool os_task_queue_inside(os_task_queue_t *tq)
- {
- return tq->id == thread_id;
- }
- static void *tiny_tubular_task_thread(void *param)
- {
- struct os_task_queue *tq = param;
- thread_id = tq->id;
- os_set_thread_name(__FUNCTION__);
- while (!exit_thread && os_sem_wait(tq->sem) == 0) {
- struct os_task_info ti;
- pthread_mutex_lock(&tq->mutex);
- deque_pop_front(&tq->tasks, &ti, sizeof(ti));
- if (tq->tasks.size && ti.task == wait_for_thread) {
- deque_push_back(&tq->tasks, &ti, sizeof(ti));
- deque_pop_front(&tq->tasks, &ti, sizeof(ti));
- }
- if (tq->tasks.size && ti.task == stop_thread) {
- deque_push_back(&tq->tasks, &ti, sizeof(ti));
- deque_pop_front(&tq->tasks, &ti, sizeof(ti));
- }
- if (tq->waiting) {
- if (ti.task == wait_for_thread) {
- tq->waiting = false;
- } else {
- tq->tasks_processed = true;
- }
- }
- pthread_mutex_unlock(&tq->mutex);
- ti.task(ti.param);
- }
- return NULL;
- }
|