1 /*        $NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $ */
2 
3 /*-
4  * Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
5  * All rights reserved.
6  *
7  * This code is derived from software contributed to The NetBSD Foundation
8  * by Taylor R. Campbell and Jason R. Thorpe.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
20  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
21  * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
22  * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
23  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31 
32 /*
33  * Thread pools.
34  *
35  * A thread pool is a collection of worker threads idle or running
36  * jobs, together with a dispatcher thread that does not run jobs but
37  * can be given jobs to assign to a worker thread.  Scheduling a job in
38  * a thread pool does not allocate or even sleep at all, except perhaps
39  * on an adaptive lock, unlike kthread_create.  Jobs reuse threads, so
40  * they do not incur the expense of creating and destroying kthreads
41  * unless there is not much work to be done.
42  *
43  * A per-CPU thread pool (threadpool_percpu) is a collection of thread
44  * pools, one per CPU bound to that CPU.  For each priority level in
45  * use, there is one shared unbound thread pool (i.e., pool of threads
46  * not bound to any CPU) and one shared per-CPU thread pool.
47  *
48  * To use the unbound thread pool at priority pri, call
49  * threadpool_get(&pool, pri).  When you're done, call
50  * threadpool_put(pool, pri).
51  *
52  * To use the per-CPU thread pools at priority pri, call
53  * threadpool_percpu_get(&pool_percpu, pri), and then use the thread
54  * pool returned by threadpool_percpu_ref(pool_percpu) for the current
55  * CPU, or by threadpool_percpu_ref_remote(pool_percpu, ci) for another
56  * CPU.  When you're done, call threadpool_percpu_put(pool_percpu,
57  * pri).
58  *
59  * +--MACHINE-----------------------------------------------------+
60  * | +--CPU 0---------+ +--CPU 1---------+     +--CPU n---------+ |
61  * | | <dispatcher 0> | | <dispatcher 1> | ... | <dispatcher n> | |
62  * | | <idle 0a>      | | <running 1a>   | ... | <idle na>      | |
63  * | | <running 0b>   | | <running 1b>   | ... | <idle nb>      | |
64  * | | .              | | .              | ... | .              | |
65  * | | .              | | .              | ... | .              | |
66  * | | .              | | .              | ... | .              | |
67  * | +----------------+ +----------------+     +----------------+ |
68  * |            +--unbound-----------+                            |
69  * |            | <dispatcher n+1>   |                            |
70  * |            | <idle (n+1)a>      |                            |
71  * |            | <running (n+1)b>   |                            |
72  * |            +--------------------+                            |
73  * +--------------------------------------------------------------+
74  *
75  * XXX Why one dispatcher per CPU?  I did that originally to avoid
76  * touching remote CPUs' memory when scheduling a job, but that still
77  * requires interprocessor synchronization.  Perhaps we could get by
78  * with a single dispatcher thread, at the expense of another pointer
79  * in struct threadpool_job to identify the CPU on which it must run in
80  * order for the dispatcher to schedule it correctly.
81  */
82 
83 #include <sys/cdefs.h>
84 __KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.23 2021/01/23 16:33:49 riastradh Exp $");
85 
86 #include <sys/types.h>
87 #include <sys/param.h>
88 #include <sys/atomic.h>
89 #include <sys/condvar.h>
90 #include <sys/cpu.h>
91 #include <sys/kernel.h>
92 #include <sys/kmem.h>
93 #include <sys/kthread.h>
94 #include <sys/mutex.h>
95 #include <sys/once.h>
96 #include <sys/percpu.h>
97 #include <sys/pool.h>
98 #include <sys/proc.h>
99 #include <sys/queue.h>
100 #include <sys/sdt.h>
101 #include <sys/sysctl.h>
102 #include <sys/systm.h>
103 #include <sys/threadpool.h>
104 
105 /* Probes */
106 
107 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get,
108     "pri_t"/*pri*/);
109 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__create,
110     "pri_t"/*pri*/);
111 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, get__race,
112     "pri_t"/*pri*/);
113 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put,
114     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
115 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, put__destroy,
116     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
117 
118 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get,
119     "pri_t"/*pri*/);
120 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__create,
121     "pri_t"/*pri*/);
122 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, percpu__get__race,
123     "pri_t"/*pri*/);
124 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put,
125     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
126 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, percpu__put__destroy,
127     "struct threadpool *"/*pool*/, "pri_t"/*pri*/);
128 
129 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, create,
130     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/);
131 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__success,
132     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "struct threadpool *"/*pool*/);
133 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, create__failure,
134     "struct cpu_info *"/*ci*/, "pri_t"/*pri*/, "int"/*error*/);
135 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, destroy,
136     "struct threadpool *"/*pool*/);
137 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, destroy__wait,
138     "struct threadpool *"/*pool*/, "uint64_t"/*refcnt*/);
139 
140 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job,
141     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
142 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__running,
143     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
144 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__dispatcher,
145     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
146 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, schedule__job__thread,
147     "struct threadpool *"/*pool*/,
148     "struct threadpool_job *"/*job*/,
149     "struct lwp *"/*thread*/);
150 
151 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__start,
152     "struct threadpool *"/*pool*/);
153 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__dying,
154     "struct threadpool *"/*pool*/);
155 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__spawn,
156     "struct threadpool *"/*pool*/);
157 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, dispatcher__race,
158     "struct threadpool *"/*pool*/,
159     "struct threadpool_job *"/*job*/);
160 SDT_PROBE_DEFINE3(sdt, kernel, threadpool, dispatcher__assign,
161     "struct threadpool *"/*pool*/,
162     "struct threadpool_job *"/*job*/,
163     "struct lwp *"/*thread*/);
164 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__exit,
165     "struct threadpool *"/*pool*/);
166 
167 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__start,
168     "struct threadpool *"/*pool*/);
169 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__dying,
170     "struct threadpool *"/*pool*/);
171 SDT_PROBE_DEFINE2(sdt, kernel, threadpool, thread__job,
172     "struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
173 SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__exit,
174     "struct threadpool *"/*pool*/);
175 
176 /* Data structures */
177 
178 TAILQ_HEAD(job_head, threadpool_job);
179 TAILQ_HEAD(thread_head, threadpool_thread);
180 
181 struct threadpool_thread {
182           struct lwp                              *tpt_lwp;
183           char                                    *tpt_lwp_savedname;
184           struct threadpool             *tpt_pool;
185           struct threadpool_job                   *tpt_job;
186           kcondvar_t                              tpt_cv;
187           TAILQ_ENTRY(threadpool_thread)          tpt_entry;
188 };
189 
190 struct threadpool {
191           kmutex_t                      tp_lock;
192           struct threadpool_thread      tp_dispatcher;
193           struct job_head                         tp_jobs;
194           struct thread_head            tp_idle_threads;
195           uint64_t                      tp_refcnt;
196           int                                     tp_flags;
197 #define   THREADPOOL_DYING    0x01
198           struct cpu_info                         *tp_cpu;
199           pri_t                                   tp_pri;
200 };
201 
202 static void         threadpool_hold(struct threadpool *);
203 static void         threadpool_rele(struct threadpool *);
204 
205 static int          threadpool_percpu_create(struct threadpool_percpu **, pri_t);
206 static void         threadpool_percpu_destroy(struct threadpool_percpu *);
207 static void         threadpool_percpu_init(void *, void *, struct cpu_info *);
208 static void         threadpool_percpu_ok(void *, void *, struct cpu_info *);
209 static void         threadpool_percpu_fini(void *, void *, struct cpu_info *);
210 
211 static threadpool_job_fn_t threadpool_job_dead;
212 
213 static void         threadpool_job_hold(struct threadpool_job *);
214 static void         threadpool_job_rele(struct threadpool_job *);
215 
216 static void         threadpool_dispatcher_thread(void *) __dead;
217 static void         threadpool_thread(void *) __dead;
218 
219 static pool_cache_t threadpool_thread_pc __read_mostly;
220 
221 static kmutex_t               threadpools_lock __cacheline_aligned;
222 
223           /* Default to 30 second idle timeout for pool threads. */
224 static int          threadpool_idle_time_ms = 30 * 1000;
225 
226 struct threadpool_unbound {
227           struct threadpool             tpu_pool;
228 
229           /* protected by threadpools_lock */
230           LIST_ENTRY(threadpool_unbound)          tpu_link;
231           uint64_t                      tpu_refcnt;
232 };
233 
234 static LIST_HEAD(, threadpool_unbound) unbound_threadpools;
235 
236 static struct threadpool_unbound *
threadpool_lookup_unbound(pri_t pri)237 threadpool_lookup_unbound(pri_t pri)
238 {
239           struct threadpool_unbound *tpu;
240 
241           LIST_FOREACH(tpu, &unbound_threadpools, tpu_link) {
242                     if (tpu->tpu_pool.tp_pri == pri)
243                               return tpu;
244           }
245           return NULL;
246 }
247 
248 static void
threadpool_insert_unbound(struct threadpool_unbound * tpu)249 threadpool_insert_unbound(struct threadpool_unbound *tpu)
250 {
251           KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == NULL);
252           LIST_INSERT_HEAD(&unbound_threadpools, tpu, tpu_link);
253 }
254 
255 static void
threadpool_remove_unbound(struct threadpool_unbound * tpu)256 threadpool_remove_unbound(struct threadpool_unbound *tpu)
257 {
258           KASSERT(threadpool_lookup_unbound(tpu->tpu_pool.tp_pri) == tpu);
259           LIST_REMOVE(tpu, tpu_link);
260 }
261 
262 struct threadpool_percpu {
263           percpu_t *                              tpp_percpu;
264           pri_t                                   tpp_pri;
265 
266           /* protected by threadpools_lock */
267           LIST_ENTRY(threadpool_percpu) tpp_link;
268           uint64_t                      tpp_refcnt;
269 };
270 
271 static LIST_HEAD(, threadpool_percpu) percpu_threadpools;
272 
273 static struct threadpool_percpu *
threadpool_lookup_percpu(pri_t pri)274 threadpool_lookup_percpu(pri_t pri)
275 {
276           struct threadpool_percpu *tpp;
277 
278           LIST_FOREACH(tpp, &percpu_threadpools, tpp_link) {
279                     if (tpp->tpp_pri == pri)
280                               return tpp;
281           }
282           return NULL;
283 }
284 
285 static void
threadpool_insert_percpu(struct threadpool_percpu * tpp)286 threadpool_insert_percpu(struct threadpool_percpu *tpp)
287 {
288           KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == NULL);
289           LIST_INSERT_HEAD(&percpu_threadpools, tpp, tpp_link);
290 }
291 
292 static void
threadpool_remove_percpu(struct threadpool_percpu * tpp)293 threadpool_remove_percpu(struct threadpool_percpu *tpp)
294 {
295           KASSERT(threadpool_lookup_percpu(tpp->tpp_pri) == tpp);
296           LIST_REMOVE(tpp, tpp_link);
297 }
298 
299 static int
sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS)300 sysctl_kern_threadpool_idle_ms(SYSCTLFN_ARGS)
301 {
302           struct sysctlnode node;
303           int val, error;
304 
305           node = *rnode;
306 
307           val = threadpool_idle_time_ms;
308           node.sysctl_data = &val;
309           error = sysctl_lookup(SYSCTLFN_CALL(&node));
310           if (error == 0 && newp != NULL) {
311                     /* Disallow negative values and 0 (forever). */
312                     if (val < 1)
313                               error = EINVAL;
314                     else
315                               threadpool_idle_time_ms = val;
316           }
317 
318           return error;
319 }
320 
321 SYSCTL_SETUP_PROTO(sysctl_threadpool_setup);
322 
323 SYSCTL_SETUP(sysctl_threadpool_setup,
324     "sysctl kern.threadpool subtree setup")
325 {
326           const struct sysctlnode *rnode, *cnode;
327           int error __diagused;
328 
329           error = sysctl_createv(clog, 0, NULL, &rnode,
330               CTLFLAG_PERMANENT,
331               CTLTYPE_NODE, "threadpool",
332               SYSCTL_DESCR("threadpool subsystem options"),
333               NULL, 0, NULL, 0,
334               CTL_KERN, CTL_CREATE, CTL_EOL);
335           KASSERT(error == 0);
336 
337           error = sysctl_createv(clog, 0, &rnode, &cnode,
338               CTLFLAG_PERMANENT | CTLFLAG_READWRITE,
339               CTLTYPE_INT, "idle_ms",
340               SYSCTL_DESCR("idle thread timeout in ms"),
341               sysctl_kern_threadpool_idle_ms, 0, NULL, 0,
342               CTL_CREATE, CTL_EOL);
343           KASSERT(error == 0);
344 }
345 
346 void
threadpools_init(void)347 threadpools_init(void)
348 {
349 
350           threadpool_thread_pc =
351               pool_cache_init(sizeof(struct threadpool_thread), 0, 0, 0,
352                     "thplthrd", NULL, IPL_NONE, NULL, NULL, NULL);
353 
354           LIST_INIT(&unbound_threadpools);
355           LIST_INIT(&percpu_threadpools);
356           mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE);
357 }
358 
359 static void
threadnamesuffix(char * buf,size_t buflen,struct cpu_info * ci,int pri)360 threadnamesuffix(char *buf, size_t buflen, struct cpu_info *ci, int pri)
361 {
362 
363           buf[0] = '\0';
364           if (ci)
365                     snprintf(buf + strlen(buf), buflen - strlen(buf), "/%d",
366                         cpu_index(ci));
367           if (pri != PRI_NONE)
368                     snprintf(buf + strlen(buf), buflen - strlen(buf), "@%d", pri);
369 }
370 
371 /* Thread pool creation */
372 
373 static bool
threadpool_pri_is_valid(pri_t pri)374 threadpool_pri_is_valid(pri_t pri)
375 {
376           return (pri == PRI_NONE || (pri >= PRI_USER && pri < PRI_COUNT));
377 }
378 
379 static int
threadpool_create(struct threadpool * const pool,struct cpu_info * ci,pri_t pri)380 threadpool_create(struct threadpool *const pool, struct cpu_info *ci,
381     pri_t pri)
382 {
383           struct lwp *lwp;
384           char suffix[16];
385           int ktflags;
386           int error;
387 
388           KASSERT(threadpool_pri_is_valid(pri));
389 
390           SDT_PROBE2(sdt, kernel, threadpool, create,  ci, pri);
391 
392           mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM);
393           /* XXX dispatcher */
394           TAILQ_INIT(&pool->tp_jobs);
395           TAILQ_INIT(&pool->tp_idle_threads);
396           pool->tp_refcnt = 1;                    /* dispatcher's reference */
397           pool->tp_flags = 0;
398           pool->tp_cpu = ci;
399           pool->tp_pri = pri;
400 
401           pool->tp_dispatcher.tpt_lwp = NULL;
402           pool->tp_dispatcher.tpt_pool = pool;
403           pool->tp_dispatcher.tpt_job = NULL;
404           cv_init(&pool->tp_dispatcher.tpt_cv, "pooldisp");
405 
406           ktflags = 0;
407           ktflags |= KTHREAD_MPSAFE;
408           if (pri < PRI_KERNEL)
409                     ktflags |= KTHREAD_TS;
410           threadnamesuffix(suffix, sizeof(suffix), ci, pri);
411           error = kthread_create(pri, ktflags, ci, &threadpool_dispatcher_thread,
412               &pool->tp_dispatcher, &lwp, "pooldisp%s", suffix);
413           if (error)
414                     goto fail0;
415 
416           mutex_spin_enter(&pool->tp_lock);
417           pool->tp_dispatcher.tpt_lwp = lwp;
418           cv_broadcast(&pool->tp_dispatcher.tpt_cv);
419           mutex_spin_exit(&pool->tp_lock);
420 
421           SDT_PROBE3(sdt, kernel, threadpool, create__success,  ci, pri, pool);
422           return 0;
423 
424 fail0:    KASSERT(error);
425           KASSERT(pool->tp_dispatcher.tpt_job == NULL);
426           KASSERT(pool->tp_dispatcher.tpt_pool == pool);
427           KASSERT(pool->tp_flags == 0);
428           KASSERT(pool->tp_refcnt == 0);
429           KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
430           KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
431           KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
432           cv_destroy(&pool->tp_dispatcher.tpt_cv);
433           mutex_destroy(&pool->tp_lock);
434           SDT_PROBE3(sdt, kernel, threadpool, create__failure,  ci, pri, error);
435           return error;
436 }
437 
438 /* Thread pool destruction */
439 
440 static void
threadpool_destroy(struct threadpool * pool)441 threadpool_destroy(struct threadpool *pool)
442 {
443           struct threadpool_thread *thread;
444 
445           SDT_PROBE1(sdt, kernel, threadpool, destroy,  pool);
446 
447           /* Mark the pool dying and wait for threads to commit suicide.  */
448           mutex_spin_enter(&pool->tp_lock);
449           KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
450           pool->tp_flags |= THREADPOOL_DYING;
451           cv_broadcast(&pool->tp_dispatcher.tpt_cv);
452           TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry)
453                     cv_broadcast(&thread->tpt_cv);
454           while (0 < pool->tp_refcnt) {
455                     SDT_PROBE2(sdt, kernel, threadpool, destroy__wait,
456                         pool, pool->tp_refcnt);
457                     cv_wait(&pool->tp_dispatcher.tpt_cv, &pool->tp_lock);
458           }
459           mutex_spin_exit(&pool->tp_lock);
460 
461           KASSERT(pool->tp_dispatcher.tpt_job == NULL);
462           KASSERT(pool->tp_dispatcher.tpt_pool == pool);
463           KASSERT(pool->tp_flags == THREADPOOL_DYING);
464           KASSERT(pool->tp_refcnt == 0);
465           KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
466           KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
467           KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
468           cv_destroy(&pool->tp_dispatcher.tpt_cv);
469           mutex_destroy(&pool->tp_lock);
470 }
471 
472 static void
threadpool_hold(struct threadpool * pool)473 threadpool_hold(struct threadpool *pool)
474 {
475 
476           KASSERT(mutex_owned(&pool->tp_lock));
477           pool->tp_refcnt++;
478           KASSERT(pool->tp_refcnt != 0);
479 }
480 
481 static void
threadpool_rele(struct threadpool * pool)482 threadpool_rele(struct threadpool *pool)
483 {
484 
485           KASSERT(mutex_owned(&pool->tp_lock));
486           KASSERT(0 < pool->tp_refcnt);
487           if (--pool->tp_refcnt == 0)
488                     cv_broadcast(&pool->tp_dispatcher.tpt_cv);
489 }
490 
491 /* Unbound thread pools */
492 
493 int
threadpool_get(struct threadpool ** poolp,pri_t pri)494 threadpool_get(struct threadpool **poolp, pri_t pri)
495 {
496           struct threadpool_unbound *tpu, *tmp = NULL;
497           int error;
498 
499           ASSERT_SLEEPABLE();
500 
501           SDT_PROBE1(sdt, kernel, threadpool, get,  pri);
502 
503           if (! threadpool_pri_is_valid(pri))
504                     return EINVAL;
505 
506           mutex_enter(&threadpools_lock);
507           tpu = threadpool_lookup_unbound(pri);
508           if (tpu == NULL) {
509                     mutex_exit(&threadpools_lock);
510                     SDT_PROBE1(sdt, kernel, threadpool, get__create,  pri);
511                     tmp = kmem_zalloc(sizeof(*tmp), KM_SLEEP);
512                     error = threadpool_create(&tmp->tpu_pool, NULL, pri);
513                     if (error) {
514                               kmem_free(tmp, sizeof(*tmp));
515                               return error;
516                     }
517                     mutex_enter(&threadpools_lock);
518                     tpu = threadpool_lookup_unbound(pri);
519                     if (tpu == NULL) {
520                               tpu = tmp;
521                               tmp = NULL;
522                               threadpool_insert_unbound(tpu);
523                     } else {
524                               SDT_PROBE1(sdt, kernel, threadpool, get__race,  pri);
525                     }
526           }
527           KASSERT(tpu != NULL);
528           tpu->tpu_refcnt++;
529           KASSERT(tpu->tpu_refcnt != 0);
530           mutex_exit(&threadpools_lock);
531 
532           if (tmp != NULL) {
533                     threadpool_destroy(&tmp->tpu_pool);
534                     kmem_free(tmp, sizeof(*tmp));
535           }
536           KASSERT(tpu != NULL);
537           *poolp = &tpu->tpu_pool;
538           return 0;
539 }
540 
541 void
threadpool_put(struct threadpool * pool,pri_t pri)542 threadpool_put(struct threadpool *pool, pri_t pri)
543 {
544           struct threadpool_unbound *tpu =
545               container_of(pool, struct threadpool_unbound, tpu_pool);
546 
547           ASSERT_SLEEPABLE();
548           KASSERT(threadpool_pri_is_valid(pri));
549 
550           SDT_PROBE2(sdt, kernel, threadpool, put,  pool, pri);
551 
552           mutex_enter(&threadpools_lock);
553           KASSERT(tpu == threadpool_lookup_unbound(pri));
554           KASSERT(0 < tpu->tpu_refcnt);
555           if (--tpu->tpu_refcnt == 0) {
556                     SDT_PROBE2(sdt, kernel, threadpool, put__destroy,  pool, pri);
557                     threadpool_remove_unbound(tpu);
558           } else {
559                     tpu = NULL;
560           }
561           mutex_exit(&threadpools_lock);
562 
563           if (tpu) {
564                     threadpool_destroy(&tpu->tpu_pool);
565                     kmem_free(tpu, sizeof(*tpu));
566           }
567 }
568 
569 /* Per-CPU thread pools */
570 
571 int
threadpool_percpu_get(struct threadpool_percpu ** pool_percpup,pri_t pri)572 threadpool_percpu_get(struct threadpool_percpu **pool_percpup, pri_t pri)
573 {
574           struct threadpool_percpu *pool_percpu, *tmp = NULL;
575           int error;
576 
577           ASSERT_SLEEPABLE();
578 
579           SDT_PROBE1(sdt, kernel, threadpool, percpu__get,  pri);
580 
581           if (! threadpool_pri_is_valid(pri))
582                     return EINVAL;
583 
584           mutex_enter(&threadpools_lock);
585           pool_percpu = threadpool_lookup_percpu(pri);
586           if (pool_percpu == NULL) {
587                     mutex_exit(&threadpools_lock);
588                     SDT_PROBE1(sdt, kernel, threadpool, percpu__get__create,  pri);
589                     error = threadpool_percpu_create(&tmp, pri);
590                     if (error)
591                               return error;
592                     KASSERT(tmp != NULL);
593                     mutex_enter(&threadpools_lock);
594                     pool_percpu = threadpool_lookup_percpu(pri);
595                     if (pool_percpu == NULL) {
596                               pool_percpu = tmp;
597                               tmp = NULL;
598                               threadpool_insert_percpu(pool_percpu);
599                     } else {
600                               SDT_PROBE1(sdt, kernel, threadpool, percpu__get__race,
601                                   pri);
602                     }
603           }
604           KASSERT(pool_percpu != NULL);
605           pool_percpu->tpp_refcnt++;
606           KASSERT(pool_percpu->tpp_refcnt != 0);
607           mutex_exit(&threadpools_lock);
608 
609           if (tmp != NULL)
610                     threadpool_percpu_destroy(tmp);
611           KASSERT(pool_percpu != NULL);
612           *pool_percpup = pool_percpu;
613           return 0;
614 }
615 
616 void
threadpool_percpu_put(struct threadpool_percpu * pool_percpu,pri_t pri)617 threadpool_percpu_put(struct threadpool_percpu *pool_percpu, pri_t pri)
618 {
619 
620           ASSERT_SLEEPABLE();
621 
622           KASSERT(threadpool_pri_is_valid(pri));
623 
624           SDT_PROBE2(sdt, kernel, threadpool, percpu__put,  pool_percpu, pri);
625 
626           mutex_enter(&threadpools_lock);
627           KASSERT(pool_percpu == threadpool_lookup_percpu(pri));
628           KASSERT(0 < pool_percpu->tpp_refcnt);
629           if (--pool_percpu->tpp_refcnt == 0) {
630                     SDT_PROBE2(sdt, kernel, threadpool, percpu__put__destroy,
631                         pool_percpu, pri);
632                     threadpool_remove_percpu(pool_percpu);
633           } else {
634                     pool_percpu = NULL;
635           }
636           mutex_exit(&threadpools_lock);
637 
638           if (pool_percpu)
639                     threadpool_percpu_destroy(pool_percpu);
640 }
641 
642 struct threadpool *
threadpool_percpu_ref(struct threadpool_percpu * pool_percpu)643 threadpool_percpu_ref(struct threadpool_percpu *pool_percpu)
644 {
645           struct threadpool **poolp, *pool;
646 
647           poolp = percpu_getref(pool_percpu->tpp_percpu);
648           pool = *poolp;
649           percpu_putref(pool_percpu->tpp_percpu);
650 
651           return pool;
652 }
653 
654 struct threadpool *
threadpool_percpu_ref_remote(struct threadpool_percpu * pool_percpu,struct cpu_info * ci)655 threadpool_percpu_ref_remote(struct threadpool_percpu *pool_percpu,
656     struct cpu_info *ci)
657 {
658           struct threadpool **poolp, *pool;
659 
660           /*
661            * As long as xcalls are blocked -- e.g., by kpreempt_disable
662            * -- the percpu object will not be swapped and destroyed.  We
663            * can't write to it, because the data may have already been
664            * moved to a new buffer, but we can safely read from it.
665            */
666           kpreempt_disable();
667           poolp = percpu_getptr_remote(pool_percpu->tpp_percpu, ci);
668           pool = *poolp;
669           kpreempt_enable();
670 
671           return pool;
672 }
673 
674 static int
threadpool_percpu_create(struct threadpool_percpu ** pool_percpup,pri_t pri)675 threadpool_percpu_create(struct threadpool_percpu **pool_percpup, pri_t pri)
676 {
677           struct threadpool_percpu *pool_percpu;
678           bool ok = true;
679 
680           pool_percpu = kmem_zalloc(sizeof(*pool_percpu), KM_SLEEP);
681           pool_percpu->tpp_pri = pri;
682           pool_percpu->tpp_percpu = percpu_create(sizeof(struct threadpool *),
683               threadpool_percpu_init, threadpool_percpu_fini,
684               (void *)(intptr_t)pri);
685 
686           /*
687            * Verify that all of the CPUs were initialized.
688            *
689            * XXX What to do if we add CPU hotplug?
690            */
691           percpu_foreach(pool_percpu->tpp_percpu, &threadpool_percpu_ok, &ok);
692           if (!ok)
693                     goto fail;
694 
695           /* Success!  */
696           *pool_percpup = (struct threadpool_percpu *)pool_percpu;
697           return 0;
698 
699 fail:     percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
700           kmem_free(pool_percpu, sizeof(*pool_percpu));
701           return ENOMEM;
702 }
703 
704 static void
threadpool_percpu_destroy(struct threadpool_percpu * pool_percpu)705 threadpool_percpu_destroy(struct threadpool_percpu *pool_percpu)
706 {
707 
708           percpu_free(pool_percpu->tpp_percpu, sizeof(struct threadpool *));
709           kmem_free(pool_percpu, sizeof(*pool_percpu));
710 }
711 
712 static void
threadpool_percpu_init(void * vpoolp,void * vpri,struct cpu_info * ci)713 threadpool_percpu_init(void *vpoolp, void *vpri, struct cpu_info *ci)
714 {
715           struct threadpool **const poolp = vpoolp;
716           pri_t pri = (intptr_t)(void *)vpri;
717           int error;
718 
719           *poolp = kmem_zalloc(sizeof(**poolp), KM_SLEEP);
720           error = threadpool_create(*poolp, ci, pri);
721           if (error) {
722                     KASSERT(error == ENOMEM);
723                     kmem_free(*poolp, sizeof(**poolp));
724                     *poolp = NULL;
725           }
726 }
727 
728 static void
threadpool_percpu_ok(void * vpoolp,void * vokp,struct cpu_info * ci)729 threadpool_percpu_ok(void *vpoolp, void *vokp, struct cpu_info *ci)
730 {
731           struct threadpool **const poolp = vpoolp;
732           bool *okp = vokp;
733 
734           if (*poolp == NULL)
735                     atomic_store_relaxed(okp, false);
736 }
737 
738 static void
threadpool_percpu_fini(void * vpoolp,void * vprip,struct cpu_info * ci)739 threadpool_percpu_fini(void *vpoolp, void *vprip, struct cpu_info *ci)
740 {
741           struct threadpool **const poolp = vpoolp;
742 
743           if (*poolp == NULL) /* initialization failed */
744                     return;
745           threadpool_destroy(*poolp);
746           kmem_free(*poolp, sizeof(**poolp));
747 }
748 
749 /* Thread pool jobs */
750 
751 void __printflike(4,5)
threadpool_job_init(struct threadpool_job * job,threadpool_job_fn_t fn,kmutex_t * lock,const char * fmt,...)752 threadpool_job_init(struct threadpool_job *job, threadpool_job_fn_t fn,
753     kmutex_t *lock, const char *fmt, ...)
754 {
755           va_list ap;
756 
757           va_start(ap, fmt);
758           (void)vsnprintf(job->job_name, sizeof(job->job_name), fmt, ap);
759           va_end(ap);
760 
761           job->job_lock = lock;
762           job->job_thread = NULL;
763           job->job_refcnt = 0;
764           cv_init(&job->job_cv, job->job_name);
765           job->job_fn = fn;
766 }
767 
768 static void
threadpool_job_dead(struct threadpool_job * job)769 threadpool_job_dead(struct threadpool_job *job)
770 {
771 
772           panic("threadpool job %p ran after destruction", job);
773 }
774 
775 void
threadpool_job_destroy(struct threadpool_job * job)776 threadpool_job_destroy(struct threadpool_job *job)
777 {
778 
779           ASSERT_SLEEPABLE();
780 
781           KASSERTMSG((job->job_thread == NULL), "job %p still running", job);
782 
783           mutex_enter(job->job_lock);
784           while (0 < atomic_load_relaxed(&job->job_refcnt))
785                     cv_wait(&job->job_cv, job->job_lock);
786           mutex_exit(job->job_lock);
787 
788           job->job_lock = NULL;
789           KASSERT(job->job_thread == NULL);
790           KASSERT(job->job_refcnt == 0);
791           KASSERT(!cv_has_waiters(&job->job_cv));
792           cv_destroy(&job->job_cv);
793           job->job_fn = threadpool_job_dead;
794           (void)strlcpy(job->job_name, "deadjob", sizeof(job->job_name));
795 }
796 
797 static void
threadpool_job_hold(struct threadpool_job * job)798 threadpool_job_hold(struct threadpool_job *job)
799 {
800           unsigned int refcnt __diagused;
801 
802           refcnt = atomic_inc_uint_nv(&job->job_refcnt);
803           KASSERT(refcnt != 0);
804 }
805 
806 static void
threadpool_job_rele(struct threadpool_job * job)807 threadpool_job_rele(struct threadpool_job *job)
808 {
809           unsigned int refcnt;
810 
811           KASSERT(mutex_owned(job->job_lock));
812 
813           refcnt = atomic_dec_uint_nv(&job->job_refcnt);
814           KASSERT(refcnt != UINT_MAX);
815           if (refcnt == 0)
816                     cv_broadcast(&job->job_cv);
817 }
818 
819 void
threadpool_job_done(struct threadpool_job * job)820 threadpool_job_done(struct threadpool_job *job)
821 {
822 
823           KASSERT(mutex_owned(job->job_lock));
824           KASSERT(job->job_thread != NULL);
825           KASSERT(job->job_thread->tpt_lwp == curlwp);
826 
827           /*
828            * We can safely read this field; it's only modified right before
829            * we call the job work function, and we are only preserving it
830            * to use here; no one cares if it contains junk afterward.
831            */
832           lwp_lock(curlwp);
833           curlwp->l_name = job->job_thread->tpt_lwp_savedname;
834           lwp_unlock(curlwp);
835 
836           /*
837            * Inline the work of threadpool_job_rele(); the job is already
838            * locked, the most likely scenario (XXXJRT only scenario?) is
839            * that we're dropping the last reference (the one taken in
840            * threadpool_schedule_job()), and we always do the cv_broadcast()
841            * anyway.
842            */
843           KASSERT(0 < atomic_load_relaxed(&job->job_refcnt));
844           unsigned int refcnt __diagused = atomic_dec_uint_nv(&job->job_refcnt);
845           KASSERT(refcnt != UINT_MAX);
846           cv_broadcast(&job->job_cv);
847           job->job_thread = NULL;
848 }
849 
850 void
threadpool_schedule_job(struct threadpool * pool,struct threadpool_job * job)851 threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
852 {
853 
854           KASSERT(mutex_owned(job->job_lock));
855 
856           SDT_PROBE2(sdt, kernel, threadpool, schedule__job,  pool, job);
857 
858           /*
859            * If the job's already running, let it keep running.  The job
860            * is guaranteed by the interlock not to end early -- if it had
861            * ended early, threadpool_job_done would have set job_thread
862            * to NULL under the interlock.
863            */
864           if (__predict_true(job->job_thread != NULL)) {
865                     SDT_PROBE2(sdt, kernel, threadpool, schedule__job__running,
866                         pool, job);
867                     return;
868           }
869 
870           threadpool_job_hold(job);
871 
872           /* Otherwise, try to assign a thread to the job.  */
873           mutex_spin_enter(&pool->tp_lock);
874           if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
875                     /* Nobody's idle.  Give it to the dispatcher.  */
876                     SDT_PROBE2(sdt, kernel, threadpool, schedule__job__dispatcher,
877                         pool, job);
878                     job->job_thread = &pool->tp_dispatcher;
879                     TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
880           } else {
881                     /* Assign it to the first idle thread.  */
882                     job->job_thread = TAILQ_FIRST(&pool->tp_idle_threads);
883                     SDT_PROBE3(sdt, kernel, threadpool, schedule__job__thread,
884                         pool, job, job->job_thread->tpt_lwp);
885                     TAILQ_REMOVE(&pool->tp_idle_threads, job->job_thread,
886                         tpt_entry);
887                     job->job_thread->tpt_job = job;
888           }
889 
890           /* Notify whomever we gave it to, dispatcher or idle thread.  */
891           KASSERT(job->job_thread != NULL);
892           cv_broadcast(&job->job_thread->tpt_cv);
893           mutex_spin_exit(&pool->tp_lock);
894 }
895 
896 bool
threadpool_cancel_job_async(struct threadpool * pool,struct threadpool_job * job)897 threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
898 {
899 
900           KASSERT(mutex_owned(job->job_lock));
901 
902           /*
903            * XXXJRT This fails (albeit safely) when all of the following
904            * are true:
905            *
906            *        => "pool" is something other than what the job was
907            *           scheduled on.  This can legitimately occur if,
908            *           for example, a job is percpu-scheduled on CPU0
909            *           and then CPU1 attempts to cancel it without taking
910            *           a remote pool reference.  (this might happen by
911            *           "luck of the draw").
912            *
913            *        => "job" is not yet running, but is assigned to the
914            *           dispatcher.
915            *
916            * When this happens, this code makes the determination that
917            * the job is already running.  The failure mode is that the
918            * caller is told the job is running, and thus has to wait.
919            * The dispatcher will eventually get to it and the job will
920            * proceed as if it had been already running.
921            */
922 
923           if (job->job_thread == NULL) {
924                     /* Nothing to do.  Guaranteed not running.  */
925                     return true;
926           } else if (job->job_thread == &pool->tp_dispatcher) {
927                     /* Take it off the list to guarantee it won't run.  */
928                     job->job_thread = NULL;
929                     mutex_spin_enter(&pool->tp_lock);
930                     TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
931                     mutex_spin_exit(&pool->tp_lock);
932                     threadpool_job_rele(job);
933                     return true;
934           } else {
935                     /* Too late -- already running.  */
936                     return false;
937           }
938 }
939 
940 void
threadpool_cancel_job(struct threadpool * pool,struct threadpool_job * job)941 threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
942 {
943 
944           /*
945            * We may sleep here, but we can't ASSERT_SLEEPABLE() because
946            * the job lock (used to interlock the cv_wait()) may in fact
947            * legitimately be a spin lock, so the assertion would fire
948            * as a false-positive.
949            */
950 
951           KASSERT(mutex_owned(job->job_lock));
952 
953           if (threadpool_cancel_job_async(pool, job))
954                     return;
955 
956           /* Already running.  Wait for it to complete.  */
957           while (job->job_thread != NULL)
958                     cv_wait(&job->job_cv, job->job_lock);
959 }
960 
961 /* Thread pool dispatcher thread */
962 
963 static void __dead
threadpool_dispatcher_thread(void * arg)964 threadpool_dispatcher_thread(void *arg)
965 {
966           struct threadpool_thread *const dispatcher = arg;
967           struct threadpool *const pool = dispatcher->tpt_pool;
968           struct lwp *lwp = NULL;
969           int ktflags;
970           char suffix[16];
971           int error;
972 
973           KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
974           KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND));
975 
976           /* Wait until we're initialized.  */
977           mutex_spin_enter(&pool->tp_lock);
978           while (dispatcher->tpt_lwp == NULL)
979                     cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
980 
981           SDT_PROBE1(sdt, kernel, threadpool, dispatcher__start,  pool);
982 
983           for (;;) {
984                     /* Wait until there's a job.  */
985                     while (TAILQ_EMPTY(&pool->tp_jobs)) {
986                               if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
987                                         SDT_PROBE1(sdt, kernel, threadpool,
988                                             dispatcher__dying,  pool);
989                                         break;
990                               }
991                               cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
992                     }
993                     if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
994                               break;
995 
996                     /* If there are no threads, we'll have to try to start one.  */
997                     if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
998                               SDT_PROBE1(sdt, kernel, threadpool, dispatcher__spawn,
999                                   pool);
1000                               threadpool_hold(pool);
1001                               mutex_spin_exit(&pool->tp_lock);
1002 
1003                               struct threadpool_thread *const thread =
1004                                   pool_cache_get(threadpool_thread_pc, PR_WAITOK);
1005                               thread->tpt_lwp = NULL;
1006                               thread->tpt_pool = pool;
1007                               thread->tpt_job = NULL;
1008                               cv_init(&thread->tpt_cv, "pooljob");
1009 
1010                               ktflags = 0;
1011                               ktflags |= KTHREAD_MPSAFE;
1012                               if (pool->tp_pri < PRI_KERNEL)
1013                                         ktflags |= KTHREAD_TS;
1014                               threadnamesuffix(suffix, sizeof(suffix), pool->tp_cpu,
1015                                   pool->tp_pri);
1016                               error = kthread_create(pool->tp_pri, ktflags,
1017                                   pool->tp_cpu, &threadpool_thread, thread, &lwp,
1018                                   "poolthread%s", suffix);
1019 
1020                               mutex_spin_enter(&pool->tp_lock);
1021                               if (error) {
1022                                         pool_cache_put(threadpool_thread_pc, thread);
1023                                         threadpool_rele(pool);
1024                                         /* XXX What to do to wait for memory?  */
1025                                         (void)kpause("thrdplcr", false, hz,
1026                                             &pool->tp_lock);
1027                                         continue;
1028                               }
1029                               /*
1030                                * New kthread now owns the reference to the pool
1031                                * taken above.
1032                                */
1033                               KASSERT(lwp != NULL);
1034                               TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread,
1035                                   tpt_entry);
1036                               thread->tpt_lwp = lwp;
1037                               lwp = NULL;
1038                               cv_broadcast(&thread->tpt_cv);
1039                               continue;
1040                     }
1041 
1042                     /* There are idle threads, so try giving one a job.  */
1043                     struct threadpool_job *const job = TAILQ_FIRST(&pool->tp_jobs);
1044 
1045                     /*
1046                      * Take an extra reference on the job temporarily so that
1047                      * it won't disappear on us while we have both locks dropped.
1048                      */
1049                     threadpool_job_hold(job);
1050                     mutex_spin_exit(&pool->tp_lock);
1051 
1052                     mutex_enter(job->job_lock);
1053                     /* If the job was cancelled, we'll no longer be its thread.  */
1054                     if (__predict_true(job->job_thread == dispatcher)) {
1055                               mutex_spin_enter(&pool->tp_lock);
1056                               TAILQ_REMOVE(&pool->tp_jobs, job, job_entry);
1057                               if (__predict_false(
1058                                             TAILQ_EMPTY(&pool->tp_idle_threads))) {
1059                                         /*
1060                                          * Someone else snagged the thread
1061                                          * first.  We'll have to try again.
1062                                          */
1063                                         SDT_PROBE2(sdt, kernel, threadpool,
1064                                             dispatcher__race,  pool, job);
1065                                         TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
1066                                             job_entry);
1067                               } else {
1068                                         /*
1069                                          * Assign the job to the thread and
1070                                          * wake the thread so it starts work.
1071                                          */
1072                                         struct threadpool_thread *const thread =
1073                                             TAILQ_FIRST(&pool->tp_idle_threads);
1074 
1075                                         SDT_PROBE2(sdt, kernel, threadpool,
1076                                             dispatcher__assign,  job, thread->tpt_lwp);
1077                                         KASSERT(thread->tpt_job == NULL);
1078                                         TAILQ_REMOVE(&pool->tp_idle_threads, thread,
1079                                             tpt_entry);
1080                                         thread->tpt_job = job;
1081                                         job->job_thread = thread;
1082                                         cv_broadcast(&thread->tpt_cv);
1083                               }
1084                               mutex_spin_exit(&pool->tp_lock);
1085                     }
1086                     threadpool_job_rele(job);
1087                     mutex_exit(job->job_lock);
1088 
1089                     mutex_spin_enter(&pool->tp_lock);
1090           }
1091           threadpool_rele(pool);
1092           mutex_spin_exit(&pool->tp_lock);
1093 
1094           SDT_PROBE1(sdt, kernel, threadpool, dispatcher__exit,  pool);
1095 
1096           kthread_exit(0);
1097 }
1098 
1099 /* Thread pool thread */
1100 
1101 static void __dead
threadpool_thread(void * arg)1102 threadpool_thread(void *arg)
1103 {
1104           struct threadpool_thread *const thread = arg;
1105           struct threadpool *const pool = thread->tpt_pool;
1106 
1107           KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
1108           KASSERT((pool->tp_cpu == NULL) || (curlwp->l_pflag & LP_BOUND));
1109 
1110           /* Wait until we're initialized and on the queue.  */
1111           mutex_spin_enter(&pool->tp_lock);
1112           while (thread->tpt_lwp == NULL)
1113                     cv_wait(&thread->tpt_cv, &pool->tp_lock);
1114 
1115           SDT_PROBE1(sdt, kernel, threadpool, thread__start,  pool);
1116 
1117           KASSERT(thread->tpt_lwp == curlwp);
1118           for (;;) {
1119                     /* Wait until we are assigned a job.  */
1120                     while (thread->tpt_job == NULL) {
1121                               if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
1122                                         SDT_PROBE1(sdt, kernel, threadpool,
1123                                             thread__dying,  pool);
1124                                         break;
1125                               }
1126                               if (cv_timedwait(&thread->tpt_cv, &pool->tp_lock,
1127                                         mstohz(threadpool_idle_time_ms)))
1128                                         break;
1129                     }
1130                     if (__predict_false(thread->tpt_job == NULL)) {
1131                               TAILQ_REMOVE(&pool->tp_idle_threads, thread,
1132                                   tpt_entry);
1133                               break;
1134                     }
1135 
1136                     struct threadpool_job *const job = thread->tpt_job;
1137                     KASSERT(job != NULL);
1138 
1139                     /* Set our lwp name to reflect what job we're doing.  */
1140                     lwp_lock(curlwp);
1141                     char *const lwp_name __diagused = curlwp->l_name;
1142                     thread->tpt_lwp_savedname = curlwp->l_name;
1143                     curlwp->l_name = job->job_name;
1144                     lwp_unlock(curlwp);
1145 
1146                     mutex_spin_exit(&pool->tp_lock);
1147 
1148                     SDT_PROBE2(sdt, kernel, threadpool, thread__job,  pool, job);
1149 
1150                     /* Run the job.  */
1151                     (*job->job_fn)(job);
1152 
1153                     /* lwp name restored in threadpool_job_done(). */
1154                     KASSERTMSG((curlwp->l_name == lwp_name),
1155                         "someone forgot to call threadpool_job_done()!");
1156 
1157                     /*
1158                      * We can compare pointers, but we can no longer deference
1159                      * job after this because threadpool_job_done() drops the
1160                      * last reference on the job while the job is locked.
1161                      */
1162 
1163                     mutex_spin_enter(&pool->tp_lock);
1164                     KASSERT(thread->tpt_job == job);
1165                     thread->tpt_job = NULL;
1166                     TAILQ_INSERT_TAIL(&pool->tp_idle_threads, thread, tpt_entry);
1167           }
1168           threadpool_rele(pool);
1169           mutex_spin_exit(&pool->tp_lock);
1170 
1171           SDT_PROBE1(sdt, kernel, threadpool, thread__exit,  pool);
1172 
1173           KASSERT(!cv_has_waiters(&thread->tpt_cv));
1174           cv_destroy(&thread->tpt_cv);
1175           pool_cache_put(threadpool_thread_pc, thread);
1176           kthread_exit(0);
1177 }
1178