Lines Matching refs:queue

108 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,  in _timeout_task_init()  argument
113 callout_init_mtx(&timeout_task->c, &queue->tq_mutex, in _timeout_task_init()
115 timeout_task->q = queue; in _timeout_task_init()
132 struct taskqueue *queue; in _taskqueue_create() local
139 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); in _taskqueue_create()
140 if (queue == NULL) { in _taskqueue_create()
147 STAILQ_INIT(&queue->tq_queue); in _taskqueue_create()
148 LIST_INIT(&queue->tq_active); in _taskqueue_create()
149 queue->tq_enqueue = enqueue; in _taskqueue_create()
150 queue->tq_context = context; in _taskqueue_create()
151 queue->tq_name = tq_name; in _taskqueue_create()
152 queue->tq_spin = (mtxflags & MTX_SPIN) != 0; in _taskqueue_create()
153 queue->tq_flags |= TQ_FLAGS_ACTIVE; in _taskqueue_create()
158 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; in _taskqueue_create()
159 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); in _taskqueue_create()
161 return (queue); in _taskqueue_create()
174 taskqueue_set_callback(struct taskqueue *queue, in taskqueue_set_callback() argument
183 KASSERT((queue->tq_callbacks[cb_type] == NULL), in taskqueue_set_callback()
186 queue->tq_callbacks[cb_type] = callback; in taskqueue_set_callback()
187 queue->tq_cb_contexts[cb_type] = context; in taskqueue_set_callback()
204 taskqueue_free(struct taskqueue *queue) in taskqueue_free() argument
207 TQ_LOCK(queue); in taskqueue_free()
208 queue->tq_flags &= ~TQ_FLAGS_ACTIVE; in taskqueue_free()
209 taskqueue_terminate(queue->tq_threads, queue); in taskqueue_free()
210 KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); in taskqueue_free()
211 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); in taskqueue_free()
212 mtx_destroy(&queue->tq_mutex); in taskqueue_free()
213 free(queue->tq_threads, M_TASKQUEUE); in taskqueue_free()
214 free(queue->tq_name, M_TASKQUEUE); in taskqueue_free()
215 free(queue, M_TASKQUEUE); in taskqueue_free()
219 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) in taskqueue_enqueue_locked() argument
231 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
241 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
243 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
245 prev = queue->tq_hint; in taskqueue_enqueue_locked()
250 ins = STAILQ_FIRST(&queue->tq_queue); in taskqueue_enqueue_locked()
257 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); in taskqueue_enqueue_locked()
258 queue->tq_hint = task; in taskqueue_enqueue_locked()
260 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); in taskqueue_enqueue_locked()
264 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) in taskqueue_enqueue_locked()
265 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
266 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) in taskqueue_enqueue_locked()
267 queue->tq_enqueue(queue->tq_context); in taskqueue_enqueue_locked()
268 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) in taskqueue_enqueue_locked()
269 TQ_UNLOCK(queue); in taskqueue_enqueue_locked()
276 taskqueue_enqueue(struct taskqueue *queue, struct task *task) in taskqueue_enqueue() argument
280 TQ_LOCK(queue); in taskqueue_enqueue()
281 res = taskqueue_enqueue_locked(queue, task); in taskqueue_enqueue()
290 struct taskqueue *queue; in taskqueue_timeout_func() local
294 queue = timeout_task->q; in taskqueue_timeout_func()
297 queue->tq_callouts--; in taskqueue_timeout_func()
303 taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, in taskqueue_enqueue_timeout_sbt() argument
308 TQ_LOCK(queue); in taskqueue_enqueue_timeout_sbt()
309 KASSERT(timeout_task->q == NULL || timeout_task->q == queue, in taskqueue_enqueue_timeout_sbt()
311 KASSERT(!queue->tq_spin, ("Timeout for spin-queue")); in taskqueue_enqueue_timeout_sbt()
312 timeout_task->q = queue; in taskqueue_enqueue_timeout_sbt()
316 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
319 taskqueue_enqueue_locked(queue, &timeout_task->t); in taskqueue_enqueue_timeout_sbt()
325 queue->tq_callouts++; in taskqueue_enqueue_timeout_sbt()
334 TQ_UNLOCK(queue); in taskqueue_enqueue_timeout_sbt()
340 taskqueue_enqueue_timeout(struct taskqueue *queue, in taskqueue_enqueue_timeout() argument
344 return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, in taskqueue_enqueue_timeout()
359 taskqueue_drain_tq_queue(struct taskqueue *queue) in taskqueue_drain_tq_queue() argument
363 if (STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_drain_tq_queue()
375 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); in taskqueue_drain_tq_queue()
376 queue->tq_hint = &t_barrier; in taskqueue_drain_tq_queue()
384 TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); in taskqueue_drain_tq_queue()
394 taskqueue_drain_tq_active(struct taskqueue *queue) in taskqueue_drain_tq_active() argument
399 if (LIST_EMPTY(&queue->tq_active)) in taskqueue_drain_tq_active()
403 queue->tq_callouts++; in taskqueue_drain_tq_active()
406 seq = queue->tq_seq; in taskqueue_drain_tq_active()
408 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in taskqueue_drain_tq_active()
410 TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); in taskqueue_drain_tq_active()
416 queue->tq_callouts--; in taskqueue_drain_tq_active()
417 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) in taskqueue_drain_tq_active()
418 wakeup_one(queue->tq_threads); in taskqueue_drain_tq_active()
423 taskqueue_block(struct taskqueue *queue) in taskqueue_block() argument
426 TQ_LOCK(queue); in taskqueue_block()
427 queue->tq_flags |= TQ_FLAGS_BLOCKED; in taskqueue_block()
428 TQ_UNLOCK(queue); in taskqueue_block()
432 taskqueue_unblock(struct taskqueue *queue) in taskqueue_unblock() argument
435 TQ_LOCK(queue); in taskqueue_unblock()
436 queue->tq_flags &= ~TQ_FLAGS_BLOCKED; in taskqueue_unblock()
437 if (!STAILQ_EMPTY(&queue->tq_queue)) in taskqueue_unblock()
438 queue->tq_enqueue(queue->tq_context); in taskqueue_unblock()
439 TQ_UNLOCK(queue); in taskqueue_unblock()
443 taskqueue_run_locked(struct taskqueue *queue) in taskqueue_run_locked() argument
449 KASSERT(queue != NULL, ("tq is NULL")); in taskqueue_run_locked()
450 TQ_ASSERT_LOCKED(queue); in taskqueue_run_locked()
452 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); in taskqueue_run_locked()
454 while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { in taskqueue_run_locked()
455 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); in taskqueue_run_locked()
456 if (queue->tq_hint == task) in taskqueue_run_locked()
457 queue->tq_hint = NULL; in taskqueue_run_locked()
461 tb.tb_seq = ++queue->tq_seq; in taskqueue_run_locked()
462 TQ_UNLOCK(queue); in taskqueue_run_locked()
467 TQ_LOCK(queue); in taskqueue_run_locked()
474 taskqueue_run(struct taskqueue *queue) in taskqueue_run() argument
477 TQ_LOCK(queue); in taskqueue_run()
478 taskqueue_run_locked(queue); in taskqueue_run()
479 TQ_UNLOCK(queue); in taskqueue_run()
483 task_is_running(struct taskqueue *queue, struct task *task) in task_is_running() argument
487 TQ_ASSERT_LOCKED(queue); in task_is_running()
488 LIST_FOREACH(tb, &queue->tq_active, tb_link) { in task_is_running()
501 taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) in taskqueue_poll_is_busy() argument
505 TQ_LOCK(queue); in taskqueue_poll_is_busy()
506 retval = task->ta_pending > 0 || task_is_running(queue, task); in taskqueue_poll_is_busy()
507 TQ_UNLOCK(queue); in taskqueue_poll_is_busy()
513 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, in taskqueue_cancel_locked() argument
518 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); in taskqueue_cancel_locked()
519 if (queue->tq_hint == task) in taskqueue_cancel_locked()
520 queue->tq_hint = NULL; in taskqueue_cancel_locked()
525 return (task_is_running(queue, task) ? EBUSY : 0); in taskqueue_cancel_locked()
529 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) in taskqueue_cancel() argument
533 TQ_LOCK(queue); in taskqueue_cancel()
534 error = taskqueue_cancel_locked(queue, task, pendp); in taskqueue_cancel()
535 TQ_UNLOCK(queue); in taskqueue_cancel()
541 taskqueue_cancel_timeout(struct taskqueue *queue, in taskqueue_cancel_timeout() argument
547 TQ_LOCK(queue); in taskqueue_cancel_timeout()
549 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); in taskqueue_cancel_timeout()
552 queue->tq_callouts--; in taskqueue_cancel_timeout()
554 TQ_UNLOCK(queue); in taskqueue_cancel_timeout()
562 taskqueue_drain(struct taskqueue *queue, struct task *task) in taskqueue_drain() argument
565 if (!queue->tq_spin) in taskqueue_drain()
568 TQ_LOCK(queue); in taskqueue_drain()
569 while (task->ta_pending != 0 || task_is_running(queue, task)) in taskqueue_drain()
570 TQ_SLEEP(queue, task, "tq_drain"); in taskqueue_drain()
571 TQ_UNLOCK(queue); in taskqueue_drain()
575 taskqueue_drain_all(struct taskqueue *queue) in taskqueue_drain_all() argument
578 if (!queue->tq_spin) in taskqueue_drain_all()
581 TQ_LOCK(queue); in taskqueue_drain_all()
582 (void)taskqueue_drain_tq_queue(queue); in taskqueue_drain_all()
583 (void)taskqueue_drain_tq_active(queue); in taskqueue_drain_all()
584 TQ_UNLOCK(queue); in taskqueue_drain_all()
588 taskqueue_drain_timeout(struct taskqueue *queue, in taskqueue_drain_timeout() argument
595 TQ_LOCK(queue); in taskqueue_drain_timeout()
599 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
602 taskqueue_drain(queue, &timeout_task->t); in taskqueue_drain_timeout()
607 TQ_LOCK(queue); in taskqueue_drain_timeout()
609 TQ_UNLOCK(queue); in taskqueue_drain_timeout()
613 taskqueue_quiesce(struct taskqueue *queue) in taskqueue_quiesce() argument
617 TQ_LOCK(queue); in taskqueue_quiesce()
619 ret = taskqueue_drain_tq_queue(queue); in taskqueue_quiesce()
621 ret = taskqueue_drain_tq_active(queue); in taskqueue_quiesce()
623 TQ_UNLOCK(queue); in taskqueue_quiesce()
855 taskqueue_member(struct taskqueue *queue, struct thread *td) in taskqueue_member() argument
860 if (queue->tq_threads[i] == NULL) in taskqueue_member()
862 if (queue->tq_threads[i] == td) { in taskqueue_member()
866 if (++j >= queue->tq_tcount) in taskqueue_member()