1 /*-
2 * Copyright (c) 2000 Doug Rabson
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/bus.h>
33 #include <sys/cpuset.h>
34 #include <sys/interrupt.h>
35 #include <sys/kernel.h>
36 #include <sys/kthread.h>
37 #include <sys/libkern.h>
38 #include <sys/limits.h>
39 #include <sys/lock.h>
40 #include <sys/malloc.h>
41 #include <sys/mutex.h>
42 #include <sys/proc.h>
43 #include <sys/sched.h>
44 #include <sys/smp.h>
45 #include <sys/taskqueue.h>
46 #include <sys/unistd.h>
47 #include <machine/stdarg.h>
48
49 static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
50 static void *taskqueue_giant_ih;
51 static void *taskqueue_ih;
52 static void taskqueue_fast_enqueue(void *);
53 static void taskqueue_swi_enqueue(void *);
54 static void taskqueue_swi_giant_enqueue(void *);
55
56 struct taskqueue_busy {
57 struct task *tb_running;
58 TAILQ_ENTRY(taskqueue_busy) tb_link;
59 };
60
61 struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
62
63 struct taskqueue {
64 STAILQ_HEAD(, task) tq_queue;
65 taskqueue_enqueue_fn tq_enqueue;
66 void *tq_context;
67 char *tq_name;
68 TAILQ_HEAD(, taskqueue_busy) tq_active;
69 struct mtx tq_mutex;
70 struct thread **tq_threads;
71 int tq_tcount;
72 int tq_spin;
73 int tq_flags;
74 int tq_callouts;
75 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
76 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
77 };
78
79 #define TQ_FLAGS_ACTIVE (1 << 0)
80 #define TQ_FLAGS_BLOCKED (1 << 1)
81 #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2)
82
83 #define DT_CALLOUT_ARMED (1 << 0)
84
85 #define TQ_LOCK(tq) \
86 do { \
87 if ((tq)->tq_spin) \
88 mtx_lock_spin(&(tq)->tq_mutex); \
89 else \
90 mtx_lock(&(tq)->tq_mutex); \
91 } while (0)
92 #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED)
93
94 #define TQ_UNLOCK(tq) \
95 do { \
96 if ((tq)->tq_spin) \
97 mtx_unlock_spin(&(tq)->tq_mutex); \
98 else \
99 mtx_unlock(&(tq)->tq_mutex); \
100 } while (0)
101 #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
102
103 void
_timeout_task_init(struct taskqueue * queue,struct timeout_task * timeout_task,int priority,task_fn_t func,void * context)104 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
105 int priority, task_fn_t func, void *context)
106 {
107
108 TASK_INIT(&timeout_task->t, priority, func, context);
109 callout_init_mtx(&timeout_task->c, &queue->tq_mutex,
110 CALLOUT_RETURNUNLOCKED);
111 timeout_task->q = queue;
112 timeout_task->f = 0;
113 }
114
115 static __inline int
TQ_SLEEP(struct taskqueue * tq,void * p,struct mtx * m,int pri,const char * wm,int t)116 TQ_SLEEP(struct taskqueue *tq, void *p, struct mtx *m, int pri, const char *wm,
117 int t)
118 {
119 if (tq->tq_spin)
120 return (msleep_spin(p, m, wm, t));
121 return (msleep(p, m, pri, wm, t));
122 }
123
124 static struct taskqueue *
_taskqueue_create(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context,int mtxflags,const char * mtxname __unused)125 _taskqueue_create(const char *name, int mflags,
126 taskqueue_enqueue_fn enqueue, void *context,
127 int mtxflags, const char *mtxname __unused)
128 {
129 struct taskqueue *queue;
130 char *tq_name = NULL;
131
132 if (name != NULL)
133 tq_name = strndup(name, 32, M_TASKQUEUE);
134 if (tq_name == NULL)
135 tq_name = "taskqueue";
136
137 queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO);
138 if (!queue)
139 return NULL;
140
141 STAILQ_INIT(&queue->tq_queue);
142 TAILQ_INIT(&queue->tq_active);
143 queue->tq_enqueue = enqueue;
144 queue->tq_context = context;
145 queue->tq_name = tq_name;
146 queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
147 queue->tq_flags |= TQ_FLAGS_ACTIVE;
148 if (enqueue == taskqueue_fast_enqueue ||
149 enqueue == taskqueue_swi_enqueue ||
150 enqueue == taskqueue_swi_giant_enqueue ||
151 enqueue == taskqueue_thread_enqueue)
152 queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
153 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
154
155 return queue;
156 }
157
158 struct taskqueue *
taskqueue_create(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context)159 taskqueue_create(const char *name, int mflags,
160 taskqueue_enqueue_fn enqueue, void *context)
161 {
162
163 return _taskqueue_create(name, mflags, enqueue, context,
164 MTX_DEF, name);
165 }
166
167 void
taskqueue_set_callback(struct taskqueue * queue,enum taskqueue_callback_type cb_type,taskqueue_callback_fn callback,void * context)168 taskqueue_set_callback(struct taskqueue *queue,
169 enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback,
170 void *context)
171 {
172
173 KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) &&
174 (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)),
175 ("Callback type %d not valid, must be %d-%d", cb_type,
176 TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX));
177 KASSERT((queue->tq_callbacks[cb_type] == NULL),
178 ("Re-initialization of taskqueue callback?"));
179
180 queue->tq_callbacks[cb_type] = callback;
181 queue->tq_cb_contexts[cb_type] = context;
182 }
183
184 /*
185 * Signal a taskqueue thread to terminate.
186 */
187 static void
taskqueue_terminate(struct thread ** pp,struct taskqueue * tq)188 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
189 {
190
191 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
192 wakeup(tq);
193 TQ_SLEEP(tq, pp, &tq->tq_mutex, PWAIT, "taskqueue_destroy", 0);
194 }
195 }
196
197 void
taskqueue_free(struct taskqueue * queue)198 taskqueue_free(struct taskqueue *queue)
199 {
200
201 TQ_LOCK(queue);
202 queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
203 taskqueue_terminate(queue->tq_threads, queue);
204 KASSERT(TAILQ_EMPTY(&queue->tq_active), ("Tasks still running?"));
205 KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
206 mtx_destroy(&queue->tq_mutex);
207 free(queue->tq_threads, M_TASKQUEUE);
208 free(queue->tq_name, M_TASKQUEUE);
209 free(queue, M_TASKQUEUE);
210 }
211
212 static int
taskqueue_enqueue_locked(struct taskqueue * queue,struct task * task)213 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
214 {
215 struct task *ins;
216 struct task *prev;
217
218 KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func"));
219 /*
220 * Count multiple enqueues.
221 */
222 if (task->ta_pending) {
223 if (task->ta_pending < USHRT_MAX)
224 task->ta_pending++;
225 TQ_UNLOCK(queue);
226 return (0);
227 }
228
229 /*
230 * Optimise the case when all tasks have the same priority.
231 */
232 prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
233 if (!prev || prev->ta_priority >= task->ta_priority) {
234 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
235 } else {
236 prev = NULL;
237 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
238 prev = ins, ins = STAILQ_NEXT(ins, ta_link))
239 if (ins->ta_priority < task->ta_priority)
240 break;
241
242 if (prev)
243 STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
244 else
245 STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
246 }
247
248 task->ta_pending = 1;
249 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0)
250 TQ_UNLOCK(queue);
251 if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
252 queue->tq_enqueue(queue->tq_context);
253 if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0)
254 TQ_UNLOCK(queue);
255
256 /* Return with lock released. */
257 return (0);
258 }
259
260 int
taskqueue_enqueue(struct taskqueue * queue,struct task * task)261 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
262 {
263 int res;
264
265 TQ_LOCK(queue);
266 res = taskqueue_enqueue_locked(queue, task);
267 /* The lock is released inside. */
268
269 return (res);
270 }
271
272 static void
taskqueue_timeout_func(void * arg)273 taskqueue_timeout_func(void *arg)
274 {
275 struct taskqueue *queue;
276 struct timeout_task *timeout_task;
277
278 timeout_task = arg;
279 queue = timeout_task->q;
280 KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
281 timeout_task->f &= ~DT_CALLOUT_ARMED;
282 queue->tq_callouts--;
283 taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
284 /* The lock is released inside. */
285 }
286
287 int
taskqueue_enqueue_timeout(struct taskqueue * queue,struct timeout_task * timeout_task,int ticks)288 taskqueue_enqueue_timeout(struct taskqueue *queue,
289 struct timeout_task *timeout_task, int ticks)
290 {
291 int res;
292
293 TQ_LOCK(queue);
294 KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
295 ("Migrated queue"));
296 KASSERT(!queue->tq_spin, ("Timeout for spin-queue"));
297 timeout_task->q = queue;
298 res = timeout_task->t.ta_pending;
299 if (ticks == 0) {
300 taskqueue_enqueue_locked(queue, &timeout_task->t);
301 /* The lock is released inside. */
302 } else {
303 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
304 res++;
305 } else {
306 queue->tq_callouts++;
307 timeout_task->f |= DT_CALLOUT_ARMED;
308 if (ticks < 0)
309 ticks = -ticks; /* Ignore overflow. */
310 }
311 if (ticks > 0) {
312 callout_reset(&timeout_task->c, ticks,
313 taskqueue_timeout_func, timeout_task);
314 }
315 TQ_UNLOCK(queue);
316 }
317 return (res);
318 }
319
320 static void
taskqueue_task_nop_fn(void * context,int pending)321 taskqueue_task_nop_fn(void *context, int pending)
322 {
323 }
324
325 /*
326 * Block until all currently queued tasks in this taskqueue
327 * have begun execution. Tasks queued during execution of
328 * this function are ignored.
329 */
330 static void
taskqueue_drain_tq_queue(struct taskqueue * queue)331 taskqueue_drain_tq_queue(struct taskqueue *queue)
332 {
333 struct task t_barrier;
334
335 if (STAILQ_EMPTY(&queue->tq_queue))
336 return;
337
338 /*
339 * Enqueue our barrier after all current tasks, but with
340 * the highest priority so that newly queued tasks cannot
341 * pass it. Because of the high priority, we can not use
342 * taskqueue_enqueue_locked directly (which drops the lock
343 * anyway) so just insert it at tail while we have the
344 * queue lock.
345 */
346 TASK_INIT(&t_barrier, USHRT_MAX, taskqueue_task_nop_fn, &t_barrier);
347 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
348 t_barrier.ta_pending = 1;
349
350 /*
351 * Once the barrier has executed, all previously queued tasks
352 * have completed or are currently executing.
353 */
354 while (t_barrier.ta_pending != 0)
355 TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
356 }
357
358 /*
359 * Block until all currently executing tasks for this taskqueue
360 * complete. Tasks that begin execution during the execution
361 * of this function are ignored.
362 */
363 static void
taskqueue_drain_tq_active(struct taskqueue * queue)364 taskqueue_drain_tq_active(struct taskqueue *queue)
365 {
366 struct taskqueue_busy tb_marker, *tb_first;
367
368 if (TAILQ_EMPTY(&queue->tq_active))
369 return;
370
371 /* Block taskq_terminate().*/
372 queue->tq_callouts++;
373
374 /*
375 * Wait for all currently executing taskqueue threads
376 * to go idle.
377 */
378 tb_marker.tb_running = TB_DRAIN_WAITER;
379 TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
380 while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
381 TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
382 TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
383
384 /*
385 * Wakeup any other drain waiter that happened to queue up
386 * without any intervening active thread.
387 */
388 tb_first = TAILQ_FIRST(&queue->tq_active);
389 if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
390 wakeup(tb_first);
391
392 /* Release taskqueue_terminate(). */
393 queue->tq_callouts--;
394 if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
395 wakeup_one(queue->tq_threads);
396 }
397
398 void
taskqueue_block(struct taskqueue * queue)399 taskqueue_block(struct taskqueue *queue)
400 {
401
402 TQ_LOCK(queue);
403 queue->tq_flags |= TQ_FLAGS_BLOCKED;
404 TQ_UNLOCK(queue);
405 }
406
407 void
taskqueue_unblock(struct taskqueue * queue)408 taskqueue_unblock(struct taskqueue *queue)
409 {
410
411 TQ_LOCK(queue);
412 queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
413 if (!STAILQ_EMPTY(&queue->tq_queue))
414 queue->tq_enqueue(queue->tq_context);
415 TQ_UNLOCK(queue);
416 }
417
418 static void
taskqueue_run_locked(struct taskqueue * queue)419 taskqueue_run_locked(struct taskqueue *queue)
420 {
421 struct taskqueue_busy tb;
422 struct taskqueue_busy *tb_first;
423 struct task *task;
424 int pending;
425
426 KASSERT(queue != NULL, ("tq is NULL"));
427 TQ_ASSERT_LOCKED(queue);
428 tb.tb_running = NULL;
429
430 while (STAILQ_FIRST(&queue->tq_queue)) {
431 TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
432
433 /*
434 * Carefully remove the first task from the queue and
435 * zero its pending count.
436 */
437 task = STAILQ_FIRST(&queue->tq_queue);
438 KASSERT(task != NULL, ("task is NULL"));
439 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
440 pending = task->ta_pending;
441 task->ta_pending = 0;
442 tb.tb_running = task;
443 TQ_UNLOCK(queue);
444
445 KASSERT(task->ta_func != NULL, ("task->ta_func is NULL"));
446 task->ta_func(task->ta_context, pending);
447
448 TQ_LOCK(queue);
449 tb.tb_running = NULL;
450 wakeup(task);
451
452 TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
453 tb_first = TAILQ_FIRST(&queue->tq_active);
454 if (tb_first != NULL &&
455 tb_first->tb_running == TB_DRAIN_WAITER)
456 wakeup(tb_first);
457 }
458 }
459
460 void
taskqueue_run(struct taskqueue * queue)461 taskqueue_run(struct taskqueue *queue)
462 {
463
464 TQ_LOCK(queue);
465 taskqueue_run_locked(queue);
466 TQ_UNLOCK(queue);
467 }
468
469 static int
task_is_running(struct taskqueue * queue,struct task * task)470 task_is_running(struct taskqueue *queue, struct task *task)
471 {
472 struct taskqueue_busy *tb;
473
474 TQ_ASSERT_LOCKED(queue);
475 TAILQ_FOREACH(tb, &queue->tq_active, tb_link) {
476 if (tb->tb_running == task)
477 return (1);
478 }
479 return (0);
480 }
481
482 static int
taskqueue_cancel_locked(struct taskqueue * queue,struct task * task,u_int * pendp)483 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
484 u_int *pendp)
485 {
486
487 if (task->ta_pending > 0)
488 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
489 if (pendp != NULL)
490 *pendp = task->ta_pending;
491 task->ta_pending = 0;
492 return (task_is_running(queue, task) ? EBUSY : 0);
493 }
494
495 int
taskqueue_cancel(struct taskqueue * queue,struct task * task,u_int * pendp)496 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
497 {
498 int error;
499
500 TQ_LOCK(queue);
501 error = taskqueue_cancel_locked(queue, task, pendp);
502 TQ_UNLOCK(queue);
503
504 return (error);
505 }
506
507 int
taskqueue_cancel_timeout(struct taskqueue * queue,struct timeout_task * timeout_task,u_int * pendp)508 taskqueue_cancel_timeout(struct taskqueue *queue,
509 struct timeout_task *timeout_task, u_int *pendp)
510 {
511 u_int pending, pending1;
512 int error;
513
514 TQ_LOCK(queue);
515 pending = !!(callout_stop(&timeout_task->c) > 0);
516 error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
517 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
518 timeout_task->f &= ~DT_CALLOUT_ARMED;
519 queue->tq_callouts--;
520 }
521 TQ_UNLOCK(queue);
522
523 if (pendp != NULL)
524 *pendp = pending + pending1;
525 return (error);
526 }
527
528 void
taskqueue_drain(struct taskqueue * queue,struct task * task)529 taskqueue_drain(struct taskqueue *queue, struct task *task)
530 {
531
532 if (!queue->tq_spin)
533 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
534
535 TQ_LOCK(queue);
536 while (task->ta_pending != 0 || task_is_running(queue, task))
537 TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
538 TQ_UNLOCK(queue);
539 }
540
541 void
taskqueue_drain_all(struct taskqueue * queue)542 taskqueue_drain_all(struct taskqueue *queue)
543 {
544
545 if (!queue->tq_spin)
546 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
547
548 TQ_LOCK(queue);
549 taskqueue_drain_tq_queue(queue);
550 taskqueue_drain_tq_active(queue);
551 TQ_UNLOCK(queue);
552 }
553
554 void
taskqueue_drain_timeout(struct taskqueue * queue,struct timeout_task * timeout_task)555 taskqueue_drain_timeout(struct taskqueue *queue,
556 struct timeout_task *timeout_task)
557 {
558
559 callout_drain(&timeout_task->c);
560 taskqueue_drain(queue, &timeout_task->t);
561 }
562
563 static void
taskqueue_swi_enqueue(void * context)564 taskqueue_swi_enqueue(void *context)
565 {
566 swi_sched(taskqueue_ih, 0);
567 }
568
569 static void
taskqueue_swi_run(void * dummy)570 taskqueue_swi_run(void *dummy)
571 {
572 taskqueue_run(taskqueue_swi);
573 }
574
575 static void
taskqueue_swi_giant_enqueue(void * context)576 taskqueue_swi_giant_enqueue(void *context)
577 {
578 swi_sched(taskqueue_giant_ih, 0);
579 }
580
581 static void
taskqueue_swi_giant_run(void * dummy)582 taskqueue_swi_giant_run(void *dummy)
583 {
584 taskqueue_run(taskqueue_swi_giant);
585 }
586
587 static int
_taskqueue_start_threads(struct taskqueue ** tqp,int count,int pri,cpuset_t * mask,const char * name,va_list ap)588 _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
589 cpuset_t *mask, const char *name, va_list ap)
590 {
591 char ktname[MAXCOMLEN + 1];
592 struct thread *td;
593 struct taskqueue *tq;
594 int i, error;
595
596 if (count <= 0)
597 return (EINVAL);
598
599 vsnprintf(ktname, sizeof(ktname), name, ap);
600 tq = *tqp;
601
602 tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE,
603 M_NOWAIT | M_ZERO);
604 if (tq->tq_threads == NULL) {
605 printf("%s: no memory for %s threads\n", __func__, ktname);
606 return (ENOMEM);
607 }
608
609 for (i = 0; i < count; i++) {
610 if (count == 1)
611 error = kthread_add(taskqueue_thread_loop, tqp, NULL,
612 &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname);
613 else
614 error = kthread_add(taskqueue_thread_loop, tqp, NULL,
615 &tq->tq_threads[i], RFSTOPPED, 0,
616 "%s_%d", ktname, i);
617 if (error) {
618 /* should be ok to continue, taskqueue_free will dtrt */
619 printf("%s: kthread_add(%s): error %d", __func__,
620 ktname, error);
621 tq->tq_threads[i] = NULL; /* paranoid */
622 } else
623 tq->tq_tcount++;
624 }
625 for (i = 0; i < count; i++) {
626 if (tq->tq_threads[i] == NULL)
627 continue;
628 td = tq->tq_threads[i];
629 if (mask) {
630 error = cpuset_setthread(td->td_tid, mask);
631 /*
632 * Failing to pin is rarely an actual fatal error;
633 * it'll just affect performance.
634 */
635 if (error)
636 printf("%s: curthread=%llu: can't pin; "
637 "error=%d\n",
638 __func__,
639 (unsigned long long) td->td_tid,
640 error);
641 }
642 thread_lock(td);
643 sched_prio(td, pri);
644 sched_add(td, SRQ_BORING);
645 thread_unlock(td);
646 }
647
648 return (0);
649 }
650
651 int
taskqueue_start_threads(struct taskqueue ** tqp,int count,int pri,const char * name,...)652 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
653 const char *name, ...)
654 {
655 va_list ap;
656 int error;
657
658 va_start(ap, name);
659 error = _taskqueue_start_threads(tqp, count, pri, NULL, name, ap);
660 va_end(ap);
661 return (error);
662 }
663
664 int
taskqueue_start_threads_cpuset(struct taskqueue ** tqp,int count,int pri,cpuset_t * mask,const char * name,...)665 taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri,
666 cpuset_t *mask, const char *name, ...)
667 {
668 va_list ap;
669 int error;
670
671 va_start(ap, name);
672 error = _taskqueue_start_threads(tqp, count, pri, mask, name, ap);
673 va_end(ap);
674 return (error);
675 }
676
677 static inline void
taskqueue_run_callback(struct taskqueue * tq,enum taskqueue_callback_type cb_type)678 taskqueue_run_callback(struct taskqueue *tq,
679 enum taskqueue_callback_type cb_type)
680 {
681 taskqueue_callback_fn tq_callback;
682
683 TQ_ASSERT_UNLOCKED(tq);
684 tq_callback = tq->tq_callbacks[cb_type];
685 if (tq_callback != NULL)
686 tq_callback(tq->tq_cb_contexts[cb_type]);
687 }
688
689 void
taskqueue_thread_loop(void * arg)690 taskqueue_thread_loop(void *arg)
691 {
692 struct taskqueue **tqp, *tq;
693
694 tqp = arg;
695 tq = *tqp;
696 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
697 TQ_LOCK(tq);
698 while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
699 /* XXX ? */
700 taskqueue_run_locked(tq);
701 /*
702 * Because taskqueue_run() can drop tq_mutex, we need to
703 * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
704 * meantime, which means we missed a wakeup.
705 */
706 if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
707 break;
708 TQ_SLEEP(tq, tq, &tq->tq_mutex, 0, "-", 0);
709 }
710 taskqueue_run_locked(tq);
711
712 /*
713 * This thread is on its way out, so just drop the lock temporarily
714 * in order to call the shutdown callback. This allows the callback
715 * to look at the taskqueue, even just before it dies.
716 */
717 TQ_UNLOCK(tq);
718 taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
719 TQ_LOCK(tq);
720
721 /* rendezvous with thread that asked us to terminate */
722 tq->tq_tcount--;
723 wakeup_one(tq->tq_threads);
724 TQ_UNLOCK(tq);
725 kthread_exit();
726 }
727
728 void
taskqueue_thread_enqueue(void * context)729 taskqueue_thread_enqueue(void *context)
730 {
731 struct taskqueue **tqp, *tq;
732
733 tqp = context;
734 tq = *tqp;
735
736 wakeup_one(tq);
737 }
738
739 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL,
740 swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ,
741 INTR_MPSAFE, &taskqueue_ih));
742
743 TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL,
744 swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run,
745 NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih));
746
747 TASKQUEUE_DEFINE_THREAD(thread);
748
749 struct taskqueue *
taskqueue_create_fast(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context)750 taskqueue_create_fast(const char *name, int mflags,
751 taskqueue_enqueue_fn enqueue, void *context)
752 {
753 return _taskqueue_create(name, mflags, enqueue, context,
754 MTX_SPIN, "fast_taskqueue");
755 }
756
757 /* NB: for backwards compatibility */
758 int
taskqueue_enqueue_fast(struct taskqueue * queue,struct task * task)759 taskqueue_enqueue_fast(struct taskqueue *queue, struct task *task)
760 {
761 return taskqueue_enqueue(queue, task);
762 }
763
764 static void *taskqueue_fast_ih;
765
766 static void
taskqueue_fast_enqueue(void * context)767 taskqueue_fast_enqueue(void *context)
768 {
769 swi_sched(taskqueue_fast_ih, 0);
770 }
771
772 static void
taskqueue_fast_run(void * dummy)773 taskqueue_fast_run(void *dummy)
774 {
775 taskqueue_run(taskqueue_fast);
776 }
777
778 TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL,
779 swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL,
780 SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih));
781
782 int
taskqueue_member(struct taskqueue * queue,struct thread * td)783 taskqueue_member(struct taskqueue *queue, struct thread *td)
784 {
785 int i, j, ret = 0;
786
787 for (i = 0, j = 0; ; i++) {
788 if (queue->tq_threads[i] == NULL)
789 continue;
790 if (queue->tq_threads[i] == td) {
791 ret = 1;
792 break;
793 }
794 if (++j >= queue->tq_tcount)
795 break;
796 }
797 return (ret);
798 }
799
800 struct taskqgroup_cpu {
801 LIST_HEAD(, grouptask) tgc_tasks;
802 struct taskqueue *tgc_taskq;
803 int tgc_cnt;
804 int tgc_cpu;
805 };
806
807 struct taskqgroup {
808 struct taskqgroup_cpu tqg_queue[MAXCPU];
809 struct mtx tqg_lock;
810 char * tqg_name;
811 int tqg_adjusting;
812 int tqg_stride;
813 int tqg_cnt;
814 };
815
816 struct taskq_bind_task {
817 struct task bt_task;
818 int bt_cpuid;
819 };
820
821 static void
taskqgroup_cpu_create(struct taskqgroup * qgroup,int idx)822 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx)
823 {
824 struct taskqgroup_cpu *qcpu;
825
826 qcpu = &qgroup->tqg_queue[idx];
827 LIST_INIT(&qcpu->tgc_tasks);
828 qcpu->tgc_taskq = taskqueue_create_fast(NULL, M_WAITOK,
829 taskqueue_thread_enqueue, &qcpu->tgc_taskq);
830 taskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT,
831 "%s_%d", qgroup->tqg_name, idx);
832 qcpu->tgc_cpu = idx * qgroup->tqg_stride;
833 }
834
835 static void
taskqgroup_cpu_remove(struct taskqgroup * qgroup,int idx)836 taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx)
837 {
838
839 taskqueue_free(qgroup->tqg_queue[idx].tgc_taskq);
840 }
841
842 /*
843 * Find the taskq with least # of tasks that doesn't currently have any
844 * other queues from the uniq identifier.
845 */
846 static int
taskqgroup_find(struct taskqgroup * qgroup,void * uniq)847 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
848 {
849 struct grouptask *n;
850 int i, idx, mincnt;
851 int strict;
852
853 mtx_assert(&qgroup->tqg_lock, MA_OWNED);
854 if (qgroup->tqg_cnt == 0)
855 return (0);
856 idx = -1;
857 mincnt = INT_MAX;
858 /*
859 * Two passes; First scan for a queue with the least tasks that
860 * does not already service this uniq id. If that fails simply find
861 * the queue with the least total tasks;
862 */
863 for (strict = 1; mincnt == INT_MAX; strict = 0) {
864 for (i = 0; i < qgroup->tqg_cnt; i++) {
865 if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
866 continue;
867 if (strict) {
868 LIST_FOREACH(n,
869 &qgroup->tqg_queue[i].tgc_tasks, gt_list)
870 if (n->gt_uniq == uniq)
871 break;
872 if (n != NULL)
873 continue;
874 }
875 mincnt = qgroup->tqg_queue[i].tgc_cnt;
876 idx = i;
877 }
878 }
879 if (idx == -1)
880 panic("taskqgroup_find: Failed to pick a qid.");
881
882 return (idx);
883 }
884
885 void
taskqgroup_attach(struct taskqgroup * qgroup,struct grouptask * gtask,void * uniq,int irq,char * name)886 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
887 void *uniq, int irq, char *name)
888 {
889 cpuset_t mask;
890 int qid;
891
892 gtask->gt_uniq = uniq;
893 gtask->gt_name = name;
894 gtask->gt_irq = irq;
895 mtx_lock(&qgroup->tqg_lock);
896 qid = taskqgroup_find(qgroup, uniq);
897 qgroup->tqg_queue[qid].tgc_cnt++;
898 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
899 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
900 if (irq != -1 && smp_started) {
901 CPU_ZERO(&mask);
902 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
903 mtx_unlock(&qgroup->tqg_lock);
904 intr_setaffinity(irq, &mask);
905 } else
906 mtx_unlock(&qgroup->tqg_lock);
907 }
908
909 int
taskqgroup_attach_cpu(struct taskqgroup * qgroup,struct grouptask * gtask,void * uniq,int cpu,int irq,char * name)910 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
911 void *uniq, int cpu, int irq, char *name)
912 {
913 cpuset_t mask;
914 int i, qid;
915
916 qid = -1;
917 gtask->gt_uniq = uniq;
918 gtask->gt_name = name;
919 gtask->gt_irq = irq;
920 mtx_lock(&qgroup->tqg_lock);
921 for (i = 0; i < qgroup->tqg_cnt; i++)
922 if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
923 qid = i;
924 break;
925 }
926 if (qid == -1) {
927 mtx_unlock(&qgroup->tqg_lock);
928 return (EINVAL);
929 }
930 qgroup->tqg_queue[qid].tgc_cnt++;
931 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
932 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
933 if (irq != -1 && smp_started) {
934 CPU_ZERO(&mask);
935 CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask);
936 mtx_unlock(&qgroup->tqg_lock);
937 intr_setaffinity(irq, &mask);
938 } else
939 mtx_unlock(&qgroup->tqg_lock);
940 return (0);
941 }
942
943 void
taskqgroup_detach(struct taskqgroup * qgroup,struct grouptask * gtask)944 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
945 {
946 int i;
947
948 mtx_lock(&qgroup->tqg_lock);
949 for (i = 0; i < qgroup->tqg_cnt; i++)
950 if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
951 break;
952 if (i == qgroup->tqg_cnt)
953 panic("taskqgroup_detach: task not in group\n");
954 qgroup->tqg_queue[i].tgc_cnt--;
955 LIST_REMOVE(gtask, gt_list);
956 mtx_unlock(&qgroup->tqg_lock);
957 gtask->gt_taskqueue = NULL;
958 }
959
960 static void
taskqgroup_binder(void * ctx,int pending)961 taskqgroup_binder(void *ctx, int pending)
962 {
963 struct taskq_bind_task *task = (struct taskq_bind_task *)ctx;
964 cpuset_t mask;
965 int error;
966
967 CPU_ZERO(&mask);
968 CPU_SET(task->bt_cpuid, &mask);
969 error = cpuset_setthread(curthread->td_tid, &mask);
970 if (error)
971 printf("taskqgroup_binder: setaffinity failed: %d\n",
972 error);
973 free(task, M_DEVBUF);
974 }
975
976 static void
taskqgroup_bind(struct taskqgroup * qgroup)977 taskqgroup_bind(struct taskqgroup *qgroup)
978 {
979 struct taskq_bind_task *task;
980 int i;
981
982 /*
983 * Bind taskqueue threads to specific CPUs, if they have been assigned
984 * one.
985 */
986 for (i = 0; i < qgroup->tqg_cnt; i++) {
987 task = malloc(sizeof (*task), M_DEVBUF, M_NOWAIT);
988 TASK_INIT(&task->bt_task, 0, taskqgroup_binder, task);
989 task->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
990 taskqueue_enqueue_fast(qgroup->tqg_queue[i].tgc_taskq,
991 &task->bt_task);
992 }
993 }
994
995 static int
_taskqgroup_adjust(struct taskqgroup * qgroup,int cnt,int stride)996 _taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride)
997 {
998 LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL);
999 cpuset_t mask;
1000 struct grouptask *gtask;
1001 int i, old_cnt, qid;
1002
1003 mtx_assert(&qgroup->tqg_lock, MA_OWNED);
1004
1005 if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) {
1006 printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n",
1007 cnt, stride, mp_ncpus, smp_started);
1008 return (EINVAL);
1009 }
1010 if (qgroup->tqg_adjusting) {
1011 printf("taskqgroup_adjust failed: adjusting\n");
1012 return (EBUSY);
1013 }
1014 qgroup->tqg_adjusting = 1;
1015 old_cnt = qgroup->tqg_cnt;
1016 mtx_unlock(&qgroup->tqg_lock);
1017 /*
1018 * Set up queue for tasks added before boot.
1019 */
1020 if (old_cnt == 0) {
1021 LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks,
1022 grouptask, gt_list);
1023 qgroup->tqg_queue[0].tgc_cnt = 0;
1024 }
1025
1026 /*
1027 * If new taskq threads have been added.
1028 */
1029 for (i = old_cnt; i < cnt; i++)
1030 taskqgroup_cpu_create(qgroup, i);
1031 mtx_lock(&qgroup->tqg_lock);
1032 qgroup->tqg_cnt = cnt;
1033 qgroup->tqg_stride = stride;
1034
1035 /*
1036 * Adjust drivers to use new taskqs.
1037 */
1038 for (i = 0; i < old_cnt; i++) {
1039 while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) {
1040 LIST_REMOVE(gtask, gt_list);
1041 qgroup->tqg_queue[i].tgc_cnt--;
1042 LIST_INSERT_HEAD(>ask_head, gtask, gt_list);
1043 }
1044 }
1045
1046 while ((gtask = LIST_FIRST(>ask_head))) {
1047 LIST_REMOVE(gtask, gt_list);
1048 qid = taskqgroup_find(qgroup, gtask->gt_uniq);
1049 qgroup->tqg_queue[qid].tgc_cnt++;
1050 LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask,
1051 gt_list);
1052 gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
1053 }
1054 /*
1055 * Set new CPU and IRQ affinity
1056 */
1057 for (i = 0; i < cnt; i++) {
1058 qgroup->tqg_queue[i].tgc_cpu = i * qgroup->tqg_stride;
1059 CPU_ZERO(&mask);
1060 CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask);
1061 LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) {
1062 if (gtask->gt_irq == -1)
1063 continue;
1064 intr_setaffinity(gtask->gt_irq, &mask);
1065 }
1066 }
1067 mtx_unlock(&qgroup->tqg_lock);
1068
1069 /*
1070 * If taskq thread count has been reduced.
1071 */
1072 for (i = cnt; i < old_cnt; i++)
1073 taskqgroup_cpu_remove(qgroup, i);
1074
1075 mtx_lock(&qgroup->tqg_lock);
1076 qgroup->tqg_adjusting = 0;
1077
1078 taskqgroup_bind(qgroup);
1079
1080 return (0);
1081 }
1082
1083 int
taskqgroup_adjust(struct taskqgroup * qgroup,int cpu,int stride)1084 taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride)
1085 {
1086 int error;
1087
1088 mtx_lock(&qgroup->tqg_lock);
1089 error = _taskqgroup_adjust(qgroup, cpu, stride);
1090 mtx_unlock(&qgroup->tqg_lock);
1091
1092 return (error);
1093 }
1094
1095 struct taskqgroup *
taskqgroup_create(char * name)1096 taskqgroup_create(char *name)
1097 {
1098 struct taskqgroup *qgroup;
1099
1100 qgroup = malloc(sizeof(*qgroup), M_TASKQUEUE, M_WAITOK | M_ZERO);
1101 mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF);
1102 qgroup->tqg_name = name;
1103 LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks);
1104
1105 return (qgroup);
1106 }
1107
1108 void
taskqgroup_destroy(struct taskqgroup * qgroup)1109 taskqgroup_destroy(struct taskqgroup *qgroup)
1110 {
1111
1112 }
1113