xref: /dragonfly/sys/kern/subr_gtaskqueue.c (revision 5694b1af4cd69deb286f1d476515c715d7269a0f)
1 /*-
2  * Copyright (c) 2000 Doug Rabson
3  * Copyright (c) 2014 Jeff Roberson
4  * Copyright (c) 2016 Matthew Macy
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28 
29 #include <sys/cdefs.h>
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/bus.h>
33 #include <sys/cpumask.h>
34 #include <sys/kernel.h>
35 #include <sys/libkern.h>
36 #include <sys/limits.h>
37 #include <sys/lock.h>
38 #include <sys/malloc.h>
39 #include <sys/proc.h>
40 #include <sys/sched.h>
41 #include <sys/gtaskqueue.h>
42 #include <sys/unistd.h>
43 #include <machine/stdarg.h>
44 
45 static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues");
46 static void         gtaskqueue_thread_enqueue(void *);
47 static void         gtaskqueue_thread_loop(void *arg);
48 static int          task_is_running(struct gtaskqueue *queue, struct gtask *gtask);
49 static void         gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask);
50 
51 TASKQGROUP_DEFINE(softirq, ncpus, 1);
52 
53 struct gtaskqueue_busy {
54           struct gtask                  *tb_running;
55           u_int                          tb_seq;
56           LIST_ENTRY(gtaskqueue_busy) tb_link;
57 };
58 
59 typedef void (*gtaskqueue_enqueue_fn)(void *context);
60 
61 struct gtaskqueue {
62           STAILQ_HEAD(, gtask)          tq_queue;
63           LIST_HEAD(, gtaskqueue_busy) tq_active;
64           u_int                         tq_seq;
65           int                           tq_callouts;
66           struct lock                   tq_lock;
67           gtaskqueue_enqueue_fn         tq_enqueue;
68           void                          *tq_context;
69           const char                    *tq_name;
70           struct thread                 **tq_threads;
71           int                           tq_tcount;
72           int                           tq_flags;
73 #if 0
74           taskqueue_callback_fn         tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
75           void                          *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
76 #endif
77 };
78 
79 #define   TQ_FLAGS_ACTIVE               (1 << 0)
80 #define   TQ_FLAGS_BLOCKED    (1 << 1)
81 #define   TQ_FLAGS_UNLOCKED_ENQUEUE     (1 << 2)
82 
83 #define   DT_CALLOUT_ARMED    (1 << 0)
84 
85 #define   TQ_LOCK(tq)                   lockmgr(&(tq)->tq_lock, LK_EXCLUSIVE)
86 #define   TQ_ASSERT_LOCKED(tq)          KKASSERT(lockstatus(&(tq)->tq_lock, NULL) != 0)
87 #define   TQ_UNLOCK(tq)                 lockmgr(&(tq)->tq_lock, LK_RELEASE);
88 #define   TQ_ASSERT_UNLOCKED(tq)        KKASSERT(lockstatus(&(tq)->tq_lock) == 0)
89 
90 #ifdef INVARIANTS
91 static void
gtask_dump(struct gtask * gtask)92 gtask_dump(struct gtask *gtask)
93 {
94           kprintf("gtask: %p ta_flags=%x ta_priority=%d ta_func=%p "
95                     "ta_context=%p\n",
96                     gtask, gtask->ta_flags, gtask->ta_priority,
97                     gtask->ta_func, gtask->ta_context);
98 }
99 #endif
100 
101 static __inline int
TQ_SLEEP(struct gtaskqueue * tq,void * p,const char * wm)102 TQ_SLEEP(struct gtaskqueue *tq, void *p, const char *wm)
103 {
104           return (lksleep(p, &tq->tq_lock, 0, wm, 0));
105 }
106 
107 static struct gtaskqueue *
_gtaskqueue_create(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context,int lkflags,const char * mtxname __unused)108 _gtaskqueue_create(const char *name, int mflags,
109                      taskqueue_enqueue_fn enqueue, void *context,
110                      int lkflags, const char *mtxname __unused)
111 {
112           struct gtaskqueue *queue;
113 
114           queue = kmalloc(sizeof(struct gtaskqueue),
115                               M_GTASKQUEUE, mflags | M_ZERO);
116           if (!queue) {
117                     kprintf("_gtaskqueue_create: kmalloc failed %08x\n", mflags);
118                     return (NULL);
119           }
120 
121           STAILQ_INIT(&queue->tq_queue);
122           LIST_INIT(&queue->tq_active);
123           queue->tq_enqueue = enqueue;
124           queue->tq_context = context;
125           queue->tq_name = name ? name : "taskqueue";
126           queue->tq_flags |= TQ_FLAGS_ACTIVE;
127           if (enqueue == gtaskqueue_thread_enqueue)
128                     queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE;
129           lockinit(&queue->tq_lock, queue->tq_name, 0, 0);
130 
131           return (queue);
132 }
133 
134 /*
135  * Signal a taskqueue thread to terminate.
136  */
137 static void
gtaskqueue_terminate(struct thread ** pp,struct gtaskqueue * tq)138 gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
139 {
140 
141           while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
142                     wakeup(tq);
143                     TQ_SLEEP(tq, pp, "gtq_destroy");
144           }
145 }
146 
147 static void __unused
gtaskqueue_free(struct gtaskqueue * queue)148 gtaskqueue_free(struct gtaskqueue *queue)
149 {
150 
151           TQ_LOCK(queue);
152           queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
153           gtaskqueue_terminate(queue->tq_threads, queue);
154           KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?"));
155           KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks"));
156           lockuninit(&queue->tq_lock);
157           kfree(queue->tq_threads, M_GTASKQUEUE);
158           /*kfree(queue->tq_name, M_GTASKQUEUE);*/
159           kfree(queue, M_GTASKQUEUE);
160 }
161 
162 /*
163  * Wait for all to complete, then prevent it from being enqueued
164  */
165 void
grouptask_block(struct grouptask * grouptask)166 grouptask_block(struct grouptask *grouptask)
167 {
168           struct gtaskqueue *queue = grouptask->gt_taskqueue;
169           struct gtask *gtask = &grouptask->gt_task;
170 
171 #ifdef INVARIANTS
172           if (queue == NULL) {
173                     gtask_dump(gtask);
174                     panic("queue == NULL");
175           }
176 #endif
177           TQ_LOCK(queue);
178           gtask->ta_flags |= TASK_NOENQUEUE;
179           gtaskqueue_drain_locked(queue, gtask);
180           TQ_UNLOCK(queue);
181 }
182 
183 void
grouptask_unblock(struct grouptask * grouptask)184 grouptask_unblock(struct grouptask *grouptask)
185 {
186           struct gtaskqueue *queue = grouptask->gt_taskqueue;
187           struct gtask *gtask = &grouptask->gt_task;
188 
189 #ifdef INVARIANTS
190           if (queue == NULL) {
191                     gtask_dump(gtask);
192                     panic("queue == NULL");
193           }
194 #endif
195           TQ_LOCK(queue);
196           gtask->ta_flags &= ~TASK_NOENQUEUE;
197           TQ_UNLOCK(queue);
198 }
199 
200 int
grouptaskqueue_enqueue(struct gtaskqueue * queue,struct gtask * gtask)201 grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
202 {
203 #ifdef INVARIANTS
204           if (queue == NULL) {
205                     gtask_dump(gtask);
206                     panic("queue == NULL");
207           }
208 #endif
209           TQ_LOCK(queue);
210           if (gtask->ta_flags & TASK_ENQUEUED) {
211                     TQ_UNLOCK(queue);
212                     return (0);
213           }
214           if (gtask->ta_flags & TASK_NOENQUEUE) {
215                     TQ_UNLOCK(queue);
216                     return (EAGAIN);
217           }
218           STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
219           gtask->ta_flags |= TASK_ENQUEUED;
220           TQ_UNLOCK(queue);
221           if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0)
222                     queue->tq_enqueue(queue->tq_context);
223           return (0);
224 }
225 
226 static void
gtaskqueue_task_nop_fn(void * context)227 gtaskqueue_task_nop_fn(void *context)
228 {
229 }
230 
231 /*
232  * Block until all currently queued tasks in this taskqueue
233  * have begun execution.  Tasks queued during execution of
234  * this function are ignored.
235  */
236 static void
gtaskqueue_drain_tq_queue(struct gtaskqueue * queue)237 gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
238 {
239           struct gtask t_barrier;
240 
241           if (STAILQ_EMPTY(&queue->tq_queue))
242                     return;
243 
244           /*
245            * Enqueue our barrier after all current tasks, but with
246            * the highest priority so that newly queued tasks cannot
247            * pass it.  Because of the high priority, we can not use
248            * taskqueue_enqueue_locked directly (which drops the lock
249            * anyway) so just insert it at tail while we have the
250            * queue lock.
251            */
252           GTASK_INIT(&t_barrier, 0, USHRT_MAX, gtaskqueue_task_nop_fn, &t_barrier);
253           STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
254           t_barrier.ta_flags |= TASK_ENQUEUED;
255 
256           /*
257            * Once the barrier has executed, all previously queued tasks
258            * have completed or are currently executing.
259            */
260           while (t_barrier.ta_flags & TASK_ENQUEUED)
261                     TQ_SLEEP(queue, &t_barrier, "gtq_qdrain");
262 }
263 
264 /*
265  * Block until all currently executing tasks for this taskqueue
266  * complete.  Tasks that begin execution during the execution
267  * of this function are ignored.
268  */
269 static void
gtaskqueue_drain_tq_active(struct gtaskqueue * queue)270 gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
271 {
272           struct gtaskqueue_busy *tb;
273           u_int seq;
274 
275           if (LIST_EMPTY(&queue->tq_active))
276                     return;
277 
278           /* Block taskq_terminate().*/
279           queue->tq_callouts++;
280 
281           /* Wait for any active task with sequence from the past. */
282           seq = queue->tq_seq;
283 restart:
284           LIST_FOREACH(tb, &queue->tq_active, tb_link) {
285                     if ((int)(tb->tb_seq - seq) <= 0) {
286                               TQ_SLEEP(queue, tb->tb_running, "gtq_adrain");
287                               goto restart;
288                     }
289           }
290 
291           /* Release taskqueue_terminate(). */
292           queue->tq_callouts--;
293           if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
294                     wakeup_one(queue->tq_threads);
295 }
296 
297 void
gtaskqueue_block(struct gtaskqueue * queue)298 gtaskqueue_block(struct gtaskqueue *queue)
299 {
300 
301           TQ_LOCK(queue);
302           queue->tq_flags |= TQ_FLAGS_BLOCKED;
303           TQ_UNLOCK(queue);
304 }
305 
306 void
gtaskqueue_unblock(struct gtaskqueue * queue)307 gtaskqueue_unblock(struct gtaskqueue *queue)
308 {
309 
310           TQ_LOCK(queue);
311           queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
312           if (!STAILQ_EMPTY(&queue->tq_queue))
313                     queue->tq_enqueue(queue->tq_context);
314           TQ_UNLOCK(queue);
315 }
316 
317 static void
gtaskqueue_run_locked(struct gtaskqueue * queue)318 gtaskqueue_run_locked(struct gtaskqueue *queue)
319 {
320           struct gtaskqueue_busy tb;
321           struct gtask *gtask;
322 #if 0
323           struct epoch_tracker et;
324           bool in_net_epoch;
325 #endif
326 
327           KASSERT(queue != NULL, ("tq is NULL"));
328           TQ_ASSERT_LOCKED(queue);
329           tb.tb_running = NULL;
330           LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
331 #if 0
332           in_net_epoch = false;
333 #endif
334 
335           while ((gtask = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
336                     STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
337                     gtask->ta_flags &= ~TASK_ENQUEUED;
338                     tb.tb_running = gtask;
339                     tb.tb_seq = ++queue->tq_seq;
340                     TQ_UNLOCK(queue);
341 
342                     KASSERT(gtask->ta_func != NULL, ("task->ta_func is NULL"));
343 #if 0
344                     if (!in_net_epoch && TASK_IS_NET(gtask)) {
345                               in_net_epoch = true;
346                               NET_EPOCH_ENTER(et);
347                     } else if (in_net_epoch && !TASK_IS_NET(gtask)) {
348                               NET_EPOCH_EXIT(et);
349                               in_net_epoch = false;
350                     }
351 #endif
352                     gtask->ta_func(gtask->ta_context);
353 
354                     TQ_LOCK(queue);
355                     wakeup(gtask);
356           }
357 #if 0
358           if (in_net_epoch)
359                     NET_EPOCH_EXIT(et);
360 #endif
361           LIST_REMOVE(&tb, tb_link);
362 }
363 
364 static int
task_is_running(struct gtaskqueue * queue,struct gtask * gtask)365 task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
366 {
367           struct gtaskqueue_busy *tb;
368 
369           TQ_ASSERT_LOCKED(queue);
370           LIST_FOREACH(tb, &queue->tq_active, tb_link) {
371                     if (tb->tb_running == gtask)
372                               return (1);
373           }
374           return (0);
375 }
376 
377 static int
gtaskqueue_cancel_locked(struct gtaskqueue * queue,struct gtask * gtask)378 gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
379 {
380 
381           if (gtask->ta_flags & TASK_ENQUEUED)
382                     STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
383           gtask->ta_flags &= ~TASK_ENQUEUED;
384           return (task_is_running(queue, gtask) ? EBUSY : 0);
385 }
386 
387 int
gtaskqueue_cancel(struct gtaskqueue * queue,struct gtask * gtask)388 gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
389 {
390           int error;
391 
392           TQ_LOCK(queue);
393           error = gtaskqueue_cancel_locked(queue, gtask);
394           TQ_UNLOCK(queue);
395 
396           return (error);
397 }
398 
399 static void
gtaskqueue_drain_locked(struct gtaskqueue * queue,struct gtask * gtask)400 gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask)
401 {
402           while ((gtask->ta_flags & TASK_ENQUEUED) || task_is_running(queue, gtask))
403                     TQ_SLEEP(queue, gtask, "gtq_drain");
404 }
405 
406 void
gtaskqueue_drain(struct gtaskqueue * queue,struct gtask * gtask)407 gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
408 {
409           TQ_LOCK(queue);
410           gtaskqueue_drain_locked(queue, gtask);
411           TQ_UNLOCK(queue);
412 }
413 
414 void
gtaskqueue_drain_all(struct gtaskqueue * queue)415 gtaskqueue_drain_all(struct gtaskqueue *queue)
416 {
417 
418           TQ_LOCK(queue);
419           gtaskqueue_drain_tq_queue(queue);
420           gtaskqueue_drain_tq_active(queue);
421           TQ_UNLOCK(queue);
422 }
423 
424 static int __printflike(4, 0)
_gtaskqueue_start_threads(struct gtaskqueue ** tqp,int count,int pri,const char * name,__va_list ap)425 _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
426                                 const char *name, __va_list ap)
427 {
428           char ktname[MAXCOMLEN + 1];
429           struct thread *td;
430           struct gtaskqueue *tq;
431           int i, error;
432 
433           if (count <= 0)
434                     return (EINVAL);
435 
436           kvsnprintf(ktname, sizeof(ktname), name, ap);
437           tq = *tqp;
438 
439           tq->tq_threads = kmalloc(sizeof(struct thread *) * count,
440                                          M_GTASKQUEUE, M_WAITOK | M_ZERO);
441 
442           for (i = 0; i < count; i++) {
443                     int cpu = i % ncpus;
444                     if (count == 1) {
445                               error = lwkt_create(gtaskqueue_thread_loop, tqp,
446                                                       &tq->tq_threads[i], NULL,
447                                                       TDF_NOSTART, cpu,
448                                                       "%s", ktname);
449                     } else {
450                               error = lwkt_create(gtaskqueue_thread_loop, tqp,
451                                                       &tq->tq_threads[i], NULL,
452                                                       TDF_NOSTART, cpu,
453                                                       "%s_%d", ktname, i);
454                     }
455                     if (error) {
456                               /* should be ok to continue, taskqueue_free will dtrt */
457                               kprintf("%s: lwkt_create(%s): error %d",
458                                         __func__, ktname, error);
459                               tq->tq_threads[i] = NULL;               /* paranoid */
460                     } else
461                               tq->tq_tcount++;
462           }
463           for (i = 0; i < count; i++) {
464                     if (tq->tq_threads[i] == NULL)
465                               continue;
466                     td = tq->tq_threads[i];
467                     lwkt_setpri_initial(td, pri);
468                     lwkt_schedule(td);
469           }
470 
471           return (0);
472 }
473 
474 static int __printflike(4, 5)
gtaskqueue_start_threads(struct gtaskqueue ** tqp,int count,int pri,const char * name,...)475 gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri,
476                                const char *name, ...)
477 {
478           __va_list ap;
479           int error;
480 
481           __va_start(ap, name);
482           error = _gtaskqueue_start_threads(tqp, count, pri, name, ap);
483           __va_end(ap);
484           return (error);
485 }
486 
487 #if 0
488 static inline void
489 gtaskqueue_run_callback(struct gtaskqueue *tq,
490     enum taskqueue_callback_type cb_type)
491 {
492           taskqueue_callback_fn tq_callback;
493 
494           TQ_ASSERT_UNLOCKED(tq);
495           tq_callback = tq->tq_callbacks[cb_type];
496           if (tq_callback != NULL)
497                     tq_callback(tq->tq_cb_contexts[cb_type]);
498 }
499 #endif
500 
501 static void
gtaskqueue_thread_loop(void * arg)502 gtaskqueue_thread_loop(void *arg)
503 {
504           struct gtaskqueue **tqp, *tq;
505 
506           tqp = arg;
507           tq = *tqp;
508 #if 0
509           gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT);
510 #endif
511           TQ_LOCK(tq);
512           while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
513                     /* XXX ? */
514                     gtaskqueue_run_locked(tq);
515                     /*
516                      * Because taskqueue_run() can drop tq_mutex, we need to
517                      * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the
518                      * meantime, which means we missed a wakeup.
519                      */
520                     if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0)
521                               break;
522                     TQ_SLEEP(tq, tq, "-");
523           }
524           gtaskqueue_run_locked(tq);
525           /*
526            * This thread is on its way out, so just drop the lock temporarily
527            * in order to call the shutdown callback.  This allows the callback
528            * to look at the taskqueue, even just before it dies.
529            */
530 #if 0
531           TQ_UNLOCK(tq);
532           gtaskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN);
533           TQ_LOCK(tq);
534 #endif
535 
536           /* rendezvous with thread that asked us to terminate */
537           tq->tq_tcount--;
538           wakeup_one(tq->tq_threads);
539           TQ_UNLOCK(tq);
540           lwkt_exit();
541 }
542 
543 static void
gtaskqueue_thread_enqueue(void * context)544 gtaskqueue_thread_enqueue(void *context)
545 {
546           struct gtaskqueue **tqp, *tq;
547 
548           tqp = context;
549           tq = *tqp;
550           wakeup_one(tq);
551 }
552 
553 /*
554  * NOTE: FreeBSD uses MTX_SPIN locks, which doesn't make a whole lot
555  *         of sense (over-use of spin-locks in general). In DFly we
556  *         want to use blockable locks for almost everything.
557  */
558 static struct gtaskqueue *
gtaskqueue_create_fast(const char * name,int mflags,taskqueue_enqueue_fn enqueue,void * context)559 gtaskqueue_create_fast(const char *name, int mflags,
560                            taskqueue_enqueue_fn enqueue, void *context)
561 {
562           return _gtaskqueue_create(name, mflags, enqueue, context,
563                                           0, "fast_taskqueue");
564 }
565 
566 struct taskqgroup_cpu {
567           LIST_HEAD(, grouptask) tgc_tasks;
568           struct gtaskqueue *tgc_taskq;
569           int                 tgc_cnt;
570           int                 tgc_cpu;
571 };
572 
573 struct taskqgroup {
574           struct taskqgroup_cpu tqg_queue[MAXCPU];
575           struct lock         tqg_lock;
576           const char *        tqg_name;
577           int                 tqg_cnt;
578 };
579 
580 struct taskq_bind_task {
581           struct gtask bt_task;
582           int       bt_cpuid;
583 };
584 
585 static void
taskqgroup_cpu_create(struct taskqgroup * qgroup,int idx,int cpu)586 taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
587 {
588           struct taskqgroup_cpu *qcpu;
589 
590           qcpu = &qgroup->tqg_queue[idx];
591           LIST_INIT(&qcpu->tgc_tasks);
592           qcpu->tgc_taskq = gtaskqueue_create_fast(NULL, M_WAITOK,
593                                                              gtaskqueue_thread_enqueue,
594                                                              &qcpu->tgc_taskq);
595           gtaskqueue_start_threads(&qcpu->tgc_taskq, 1, TDPRI_KERN_DAEMON,
596                                          "%s_%d", qgroup->tqg_name, idx);
597           qcpu->tgc_cpu = cpu;
598 }
599 
600 /*
601  * Find the taskq with least # of tasks that doesn't currently have any
602  * other queues from the uniq identifier.
603  */
604 static int
taskqgroup_find(struct taskqgroup * qgroup,void * uniq)605 taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
606 {
607           struct grouptask *n;
608           int i, idx, mincnt;
609           int strict;
610 
611           KKASSERT(lockstatus(&qgroup->tqg_lock, NULL) != 0);
612           KASSERT(qgroup->tqg_cnt != 0,
613               ("qgroup %s has no queues", qgroup->tqg_name));
614 
615           /*
616            * Two passes: first scan for a queue with the least tasks that
617            * does not already service this uniq id.  If that fails simply find
618            * the queue with the least total tasks.
619            */
620           for (idx = -1, mincnt = INT_MAX, strict = 1; mincnt == INT_MAX;
621               strict = 0) {
622                     for (i = 0; i < qgroup->tqg_cnt; i++) {
623                               if (qgroup->tqg_queue[i].tgc_cnt > mincnt)
624                                         continue;
625                               if (strict) {
626                                         LIST_FOREACH(n, &qgroup->tqg_queue[i].tgc_tasks,
627                                             gt_list)
628                                                   if (n->gt_uniq == uniq)
629                                                             break;
630                                         if (n != NULL)
631                                                   continue;
632                               }
633                               mincnt = qgroup->tqg_queue[i].tgc_cnt;
634                               idx = i;
635                     }
636           }
637           if (idx == -1)
638                     panic("%s: failed to pick a qid.", __func__);
639 
640           return (idx);
641 }
642 
643 void
taskqgroup_attach(struct taskqgroup * qgroup,struct grouptask * gtask,void * uniq,device_t dev,struct resource * irq,const char * name)644 taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask,
645     void *uniq, device_t dev, struct resource *irq, const char *name)
646 {
647           int cpu, qid, error;
648 
649           KASSERT(qgroup->tqg_cnt > 0,
650               ("qgroup %s has no queues", qgroup->tqg_name));
651 
652           gtask->gt_uniq = uniq;
653           ksnprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
654           gtask->gt_dev = dev;
655           gtask->gt_irq = irq;
656           gtask->gt_cpu = -1;
657           lockmgr(&qgroup->tqg_lock, LK_EXCLUSIVE);
658           qid = taskqgroup_find(qgroup, uniq);
659           qgroup->tqg_queue[qid].tgc_cnt++;
660           LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
661           gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
662           if (dev != NULL && irq != NULL) {
663                     cpu = qgroup->tqg_queue[qid].tgc_cpu;
664                     gtask->gt_cpu = cpu;
665                     lockmgr(&qgroup->tqg_lock, LK_RELEASE);
666 #if 0
667                     /*
668                      * XXX FreeBSD created a mess by separating out the cpu
669                      * binding from bus_setup_intr().  Punt for now.
670                      */
671                     error = bus_bind_intr(dev, irq, cpu);
672 #endif
673                     error = 0;
674 
675                     if (error)
676                               kprintf("%s: binding interrupt failed for %s: %d\n",
677                                   __func__, gtask->gt_name, error);
678           } else {
679                     lockmgr(&qgroup->tqg_lock, LK_RELEASE);
680           }
681 }
682 
683 int
taskqgroup_attach_cpu(struct taskqgroup * qgroup,struct grouptask * gtask,void * uniq,int cpu,device_t dev,struct resource * irq,const char * name)684 taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask,
685     void *uniq, int cpu, device_t dev, struct resource *irq, const char *name)
686 {
687           int i, qid, error;
688 
689           gtask->gt_uniq = uniq;
690           ksnprintf(gtask->gt_name, GROUPTASK_NAMELEN, "%s", name ? name : "grouptask");
691           gtask->gt_dev = dev;
692           gtask->gt_irq = irq;
693           gtask->gt_cpu = cpu;
694           lockmgr(&qgroup->tqg_lock, LK_EXCLUSIVE);
695           for (i = 0, qid = -1; i < qgroup->tqg_cnt; i++) {
696                     if (qgroup->tqg_queue[i].tgc_cpu == cpu) {
697                               qid = i;
698                               break;
699                     }
700           }
701           if (qid == -1) {
702                     lockmgr(&qgroup->tqg_lock, LK_RELEASE);
703                     kprintf("%s: qid not found for %s cpu=%d\n",
704                               __func__, gtask->gt_name, cpu);
705                     return (EINVAL);
706           }
707           qgroup->tqg_queue[qid].tgc_cnt++;
708           LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list);
709           gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq;
710           cpu = qgroup->tqg_queue[qid].tgc_cpu;
711           lockmgr(&qgroup->tqg_lock, LK_RELEASE);
712 
713           if (dev != NULL && irq != NULL) {
714 #if 0
715                     /*
716                      * XXX FreeBSD created a mess by separating out the cpu
717                      * binding from bus_setup_intr().  Punt for now.
718                      */
719                     error = bus_bind_intr(dev, irq, cpu);
720 #endif
721                     error = 0;
722 
723                     if (error) {
724                               kprintf("%s: binding interrupt failed for %s: %d\n",
725                                   __func__, gtask->gt_name, error);
726                     }
727           }
728           return (0);
729 }
730 
731 void
taskqgroup_detach(struct taskqgroup * qgroup,struct grouptask * gtask)732 taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
733 {
734           int i;
735 
736           grouptask_block(gtask);
737           lockmgr(&qgroup->tqg_lock, LK_EXCLUSIVE);
738           for (i = 0; i < qgroup->tqg_cnt; i++)
739                     if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
740                               break;
741           if (i == qgroup->tqg_cnt)
742                     panic("%s: task %s not in group", __func__, gtask->gt_name);
743           qgroup->tqg_queue[i].tgc_cnt--;
744           LIST_REMOVE(gtask, gt_list);
745           lockmgr(&qgroup->tqg_lock, LK_RELEASE);
746           gtask->gt_taskqueue = NULL;
747           gtask->gt_task.ta_flags &= ~TASK_NOENQUEUE;
748 }
749 
750 static void
taskqgroup_binder(void * ctx)751 taskqgroup_binder(void *ctx)
752 {
753           struct taskq_bind_task *gtask;
754 
755           gtask = ctx;
756           lwkt_migratecpu(gtask->bt_cpuid);
757           kfree(gtask, M_DEVBUF);
758 }
759 
760 void
taskqgroup_bind(struct taskqgroup * qgroup)761 taskqgroup_bind(struct taskqgroup *qgroup)
762 {
763           struct taskq_bind_task *gtask;
764           int i;
765 
766           /*
767            * Bind taskqueue threads to specific CPUs, if they have been assigned
768            * one.
769            */
770           if (qgroup->tqg_cnt == 1)
771                     return;
772 
773           for (i = 0; i < qgroup->tqg_cnt; i++) {
774                     gtask = kmalloc(sizeof(*gtask), M_DEVBUF, M_WAITOK);
775                     GTASK_INIT(&gtask->bt_task, 0, 0, taskqgroup_binder, gtask);
776                     gtask->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu;
777                     grouptaskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq,
778                                                &gtask->bt_task);
779           }
780 }
781 
782 struct taskqgroup *
taskqgroup_create(const char * name,int cnt,int stride)783 taskqgroup_create(const char *name, int cnt, int stride)
784 {
785           struct taskqgroup *qgroup;
786           int cpu, i, j;
787 
788           qgroup = kmalloc(sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
789           lockinit(&qgroup->tqg_lock, "taskqgroup", 0, 0);
790           qgroup->tqg_name = name;
791           qgroup->tqg_cnt = cnt;
792 
793           for (cpu = i = 0; i < cnt; i++) {
794                     taskqgroup_cpu_create(qgroup, i, cpu);
795                     for (j = 0; j < stride; j++)
796                               cpu = (cpu + 1) % ncpus;
797           }
798           return (qgroup);
799 }
800 
801 void
taskqgroup_destroy(struct taskqgroup * qgroup)802 taskqgroup_destroy(struct taskqgroup *qgroup)
803 {
804 }
805 
806 void
taskqgroup_drain_all(struct taskqgroup * tqg)807 taskqgroup_drain_all(struct taskqgroup *tqg)
808 {
809           struct gtaskqueue *q;
810 
811           for (int i = 0; i < ncpus; i++) {
812                     q = tqg->tqg_queue[i].tgc_taskq;
813                     if (q == NULL)
814                               continue;
815                     gtaskqueue_drain_all(q);
816           }
817 }
818