1 /*-
2 * Copyright (c) 2009-2014, Stacey Son <sson@freebsd.org>
3 * Copyright (c) 2000-2008, Apple Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice unmodified, this list of conditions, and the following
11 * disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 *
27 * $FreeBSD: $
28 */
29
30 #include "namespace.h"
31 #include <sys/types.h>
32 #include <sys/queue.h>
33 #include <sys/rtprio.h>
34 #include <sys/signalvar.h>
35 #include <sys/thrworkq.h>
36 #include <errno.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <stddef.h>
40 #include <pthread.h>
41 #include <pthread_np.h>
42 #include <pthread_workqueue.h>
43 #include "un-namespace.h"
44
45 #include "thr_private.h"
46
47 typedef struct _pthread_workitem {
48 TAILQ_ENTRY(_pthread_workitem) item_entry; /* pthread_workitem
49 list in prio */
50 void *(*func)(void *);
51 void *func_arg;
52 struct _pthread_workqueue *workq;
53 unsigned int flags;
54 unsigned int gencount;
55 } *pthread_workitem_t;
56
57 typedef struct _pthread_workqueue_head {
58 TAILQ_HEAD(, _pthread_workqueue) wqhead;
59 struct _pthread_workqueue * next_workq;
60 } * pthread_workqueue_head_t;
61
62 struct _pthread_workqueue {
63 struct pthread *mthread; /* main thread */
64 unsigned int sig; /* Unique signature for this structure */
65 struct umutex lock; /* Used for internal mutex on structure */
66 TAILQ_ENTRY(_pthread_workqueue) wq_list; /* workqueue list in prio */
67 TAILQ_HEAD(, _pthread_workitem) item_listhead; /* pthread_workitem
68 list in prio */
69 TAILQ_HEAD(, _pthread_workitem) item_kernhead; /* pthread_workitem
70 list in prio */
71 unsigned int flags;
72 size_t stacksize;
73 int istimeshare;
74 int importance;
75 int affinity; /* XXX - not used yet */
76 int queueprio;
77 int barrier_count;
78 int kq_count;
79 void (*term_callback)(struct _pthread_workqueue *,void *);
80 void *term_callarg;
81 pthread_workqueue_head_t headp;
82 int overcommit;
83 #if ! defined(__x86_64__)
84 unsigned int rev2[11];
85 #endif
86 };
87
88 /*
89 * Workqueue flags.
90 */
91 #define PTHREAD_WORKQ_IN_CREATION 0x0001
92 #define PTHREAD_WORKQ_IN_TERMINATE 0x0002
93 #define PTHREAD_WORKQ_BARRIER_ON 0x0004
94 #define PTHREAD_WORKQ_TERM_ON 0x0008
95 #define PTHREAD_WORKQ_DESTROYED 0x0010
96 #define PTHREAD_WORKQ_REQUEUED 0x0020
97 #define PTHREAD_WORKQ_SUSPEND 0x0040
98
99 /*
100 * Workitem flags.
101 */
102 #define PTH_WQITEM_INKERNEL_QUEUE 0x0001
103 #define PTH_WQITEM_RUNNING 0x0002
104 #define PTH_WQITEM_COMPLETED 0x0004
105 #define PTH_WQITEM_REMOVED 0x0008
106 #define PTH_WQITEM_BARRIER 0x0010
107 #define PTH_WQITEM_DESTROY 0x0020
108 #define PTH_WQITEM_NOTINLIST 0x0040
109 #define PTH_WQITEM_APPLIED 0x0080
110 #define PTH_WQITEM_KERN_COUNT 0x0100
111
112 /*
113 * Signatures/magic numbers.
114 */
115 #define PTHREAD_WORKQUEUE_SIG 0xBEBEBEBE
116 #define PTHREAD_WORKQUEUE_ATTR_SIG 0xBEBEBEBE
117
118 /*
119 * Memory pool sizes.
120 */
121 #define WORKITEM_POOL_SIZE 1000
122 #define WORKQUEUE_POOL_SIZE 100
123
124 static pthread_spinlock_t __workqueue_list_lock;
125 static int kernel_workq_setup = 0;
126 static int workq_id = 0;
127 static int wqreadyprio = 0; /* current highest prio queue ready with items */
128 static pthread_workqueue_attr_t _pthread_wq_attr_default = {
129 .sig = 0,
130 .queueprio = 0,
131 .overcommit = 0,
132 };
133 static volatile int32_t kernel_workq_count = 0;
134 static volatile int32_t user_workq_count = 0;
135
136 static TAILQ_HEAD(__pthread_workqueue_pool, _pthread_workqueue)
137 __pthread_workqueue_pool_head = TAILQ_HEAD_INITIALIZER(
138 __pthread_workqueue_pool_head);
139 static TAILQ_HEAD(__pthread_workitem_pool, _pthread_workitem)
140 __pthread_workitem_pool_head = TAILQ_HEAD_INITIALIZER(
141 __pthread_workitem_pool_head);
142
143
144 struct _pthread_workqueue_head __pthread_workq0_head;
145 struct _pthread_workqueue_head __pthread_workq1_head;
146 struct _pthread_workqueue_head __pthread_workq2_head;
147 pthread_workqueue_head_t __pthread_wq_head_tbl[WORKQ_OS_NUMPRIOS] = {
148 &__pthread_workq0_head,
149 &__pthread_workq1_head,
150 &__pthread_workq2_head
151 };
152
153 static void workqueue_list_lock(void);
154 static void workqueue_list_unlock(void);
155 static pthread_workitem_t alloc_workitem(void);
156 static void free_workitem(pthread_workitem_t witem);
157 static pthread_workqueue_t alloc_workqueue(void);
158 static void free_workqueue(pthread_workqueue_t wq);
159 static void _pthread_wqthread(void *arg);
160 static void _pthread_newthread(void *arg);
161 static void _pthread_exitthread(void *arg);
162 static void pick_nextworkqueue_droplock(void);
163
164 int _pthread_workqueue_init_np(void);
165 int _pthread_workqueue_attr_init_np(pthread_workqueue_attr_t * attrp);
166 int _pthread_workqueue_attr_destroy_np(pthread_workqueue_attr_t * attr);
167 int _pthread_workqueue_attr_getqueuepriority_np(
168 const pthread_workqueue_attr_t * attr, int * qpriop);
169 int _pthread_workqueue_attr_setqueuepriority_np(pthread_workqueue_attr_t * attr,
170 int qprio);
171 int _pthread_workqueue_attr_getovercommit_np(
172 const pthread_workqueue_attr_t * attr, int * ocommp);
173 int _pthread_workqueue_attr_setovercommit_np(pthread_workqueue_attr_t * attr,
174 int ocomm);
175 int _pthread_workqueue_create_np(pthread_workqueue_t * workqp,
176 const pthread_workqueue_attr_t * attr);
177 int _pthread_workqueue_additem_np(pthread_workqueue_t workq,
178 void *( *workitem_func)(void *), void * workitem_arg,
179 pthread_workitem_handle_t * itemhandlep,
180 unsigned int *gencountp);
181 int _pthread_workqueue_requestconcurrency_np(int queue,
182 int request_concurrency);
183 int _pthread_workqueue_getovercommit_np(pthread_workqueue_t workq,
184 unsigned int *ocommp);
185 void _pthread_workqueue_atfork_prepare(void);
186 void _pthread_workqueue_atfork_parent(void);
187 void _pthread_workqueue_atfork_child(void);
188
189 __weak_reference(_pthread_workqueue_init_np, pthread_workqueue_init_np);
190 __weak_reference(_pthread_workqueue_attr_init_np,
191 pthread_workqueue_attr_init_np);
192 __weak_reference(_pthread_workqueue_attr_destroy_np,
193 pthread_workqueue_attr_destroy_np);
194 __weak_reference(_pthread_workqueue_attr_getqueuepriority_np,
195 pthread_workqueue_attr_getqueuepriority_np);
196 __weak_reference(_pthread_workqueue_attr_setqueuepriority_np,
197 pthread_workqueue_attr_setqueuepriority_np);
198 __weak_reference(_pthread_workqueue_attr_getovercommit_np,
199 pthread_workqueue_attr_getovercommit_np);
200 __weak_reference(_pthread_workqueue_attr_setovercommit_np,
201 pthread_workqueue_attr_setovercommit_np);
202 __weak_reference(_pthread_workqueue_getovercommit_np,
203 pthread_workqueue_getovercommit_np);
204 __weak_reference(_pthread_workqueue_create_np, pthread_workqueue_create_np);
205 __weak_reference(_pthread_workqueue_additem_np, pthread_workqueue_additem_np);
206 __weak_reference(_pthread_workqueue_requestconcurrency_np,
207 pthread_workqueue_requestconcurrency_np);
208 __weak_reference(_pthread_workqueue_atfork_prepare,
209 pthread_workqueue_atfork_prepare);
210 __weak_reference(_pthread_workqueue_atfork_parent,
211 pthread_workqueue_atfork_parent);
212 __weak_reference(_pthread_workqueue_atfork_child,
213 pthread_workqueue_atfork_child);
214
215 /*
216 * dispatch_atfork_{prepare(void), parent(void), child(void)}} are provided by
217 * libdispatch which may not be linked.
218 */
219 __attribute__((weak)) void dispatch_atfork_prepare(void);
220 __attribute__((weak)) void dispatch_atfork_parent(void);
221 __attribute__((weak)) void dispatch_atfork_child(void);
222
223 #if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2)
224 #define ATOMIC_INC(p) __sync_add_and_fetch((p), 1)
225 #define ATOMIC_DEC(p) __sync_sub_and_fetch((p), 1)
226 #else
227 #define ATOMIC_INC(p) atomic_fetchadd_int(p, 1)
228 #define ATOMIC_DEC(p) atomic_fetchadd_int(p, -1)
229 #endif
230
231 static void
workqueue_list_lock(void)232 workqueue_list_lock(void)
233 {
234
235 _pthread_spin_lock(&__workqueue_list_lock);
236 }
237
238 static void
workqueue_list_unlock(void)239 workqueue_list_unlock(void)
240 {
241
242 _pthread_spin_unlock(&__workqueue_list_lock);
243 }
244
245 /*
246 * Round up size to the nearest multiple of _thr_page_size.
247 */
248 static size_t
round_up(size_t size)249 round_up(size_t size)
250 {
251
252 if (size % _thr_page_size != 0)
253 size = ((size / _thr_page_size) + 1) *
254 _thr_page_size;
255 return (size);
256 }
257
258 static int
thr_workq_init(int * retid,struct pthread_attr * attr)259 thr_workq_init(int *retid, struct pthread_attr *attr)
260 {
261 struct twq_param twq;
262
263 twq.twq_retid = retid;
264 twq.twq_workqfunc = _pthread_wqthread;
265 twq.twq_newtdfunc = _pthread_newthread;
266 twq.twq_exitfunc = _pthread_exitthread;
267 twq.twq_stacksize = round_up(attr->stacksize_attr);
268 twq.twq_guardsize = round_up(attr->guardsize_attr);
269
270 return (thr_workq(WQOPS_INIT, &twq));
271 }
272
273 static int
thr_workq_thread_return(void)274 thr_workq_thread_return(void)
275 {
276 struct twq_param twq;
277
278 twq.twq_id = workq_id;
279 return (thr_workq(WQOPS_THREAD_RETURN, &twq));
280 }
281
282 static int
thr_workq_queue_add(pthread_workitem_t witem,int affinity,int prio)283 thr_workq_queue_add(pthread_workitem_t witem, int affinity, int prio)
284 {
285 struct twq_param twq;
286
287 twq.twq_id = workq_id;
288 twq.twq_add_item = (void *)witem;
289 twq.twq_add_affin = affinity;
290 twq.twq_add_prio = prio;
291
292 return (thr_workq(WQOPS_QUEUE_ADD, &twq));
293 }
294
295 static int
thr_workq_thread_setconc(int queue,int request_concurrency)296 thr_workq_thread_setconc(int queue, int request_concurrency)
297 {
298 struct twq_param twq;
299
300 twq.twq_id = workq_id;
301 twq.twq_setconc_prio = queue;
302 twq.twq_setconc_conc = request_concurrency;
303
304 return (thr_workq(WQOPS_THREAD_SETCONC, &twq));
305 }
306
307 static void
workqueue_exit(pthread_t self,pthread_workqueue_t workq,pthread_workitem_t item)308 workqueue_exit(pthread_t self, pthread_workqueue_t workq,
309 pthread_workitem_t item)
310 {
311 pthread_workitem_t baritem;
312 pthread_workqueue_head_t headp;
313 void (*func)(pthread_workqueue_t, void *);
314
315 workqueue_list_lock();
316 TAILQ_REMOVE(&workq->item_kernhead, item, item_entry);
317 workq->kq_count--;
318
319 item->flags = 0;
320 free_workitem(item);
321
322 if ((workq->flags & PTHREAD_WORKQ_BARRIER_ON) ==
323 PTHREAD_WORKQ_BARRIER_ON) {
324 workq->barrier_count--;
325
326 if (workq->barrier_count <= 0 ) {
327 /* Need to remove barrier item from the list. */
328 baritem = TAILQ_FIRST(&workq->item_listhead);
329
330 /*
331 * If the front item is a barrier and call back is
332 * registered, run that.
333 */
334 if (((baritem->flags & PTH_WQITEM_BARRIER) ==
335 PTH_WQITEM_BARRIER) && (baritem->func != NULL)){
336
337 workqueue_list_unlock();
338 func = (void (*)(pthread_workqueue_t, void *))
339 baritem->func;
340 (*func)(workq, baritem->func_arg);
341 workqueue_list_lock();
342 }
343 TAILQ_REMOVE(&workq->item_listhead, baritem,
344 item_entry);
345 baritem->flags = 0;
346 free_workitem(baritem);
347 workq->flags &= ~PTHREAD_WORKQ_BARRIER_ON;
348 if ((workq->flags & PTHREAD_WORKQ_TERM_ON) != 0) {
349 headp = __pthread_wq_head_tbl[workq->queueprio];
350 workq->flags |= PTHREAD_WORKQ_DESTROYED;
351 if (headp->next_workq == workq) {
352 headp->next_workq =
353 TAILQ_NEXT(workq, wq_list);
354 if (headp->next_workq == NULL) {
355 headp->next_workq =
356 TAILQ_FIRST(&headp->wqhead);
357 if (headp->next_workq == workq)
358 headp->next_workq=NULL;
359 }
360 }
361 TAILQ_REMOVE(&headp->wqhead, workq, wq_list);
362 workq->sig = 0;
363 if (workq->term_callback != NULL) {
364 workqueue_list_unlock();
365 (*workq->term_callback)(workq,
366 workq->term_callarg);
367 workqueue_list_lock();
368 }
369 free_workqueue(workq);
370 } else {
371 /*
372 * If there are higher prio item then reset
373 * to wqreadyprio.
374 */
375 if ((workq->queueprio < wqreadyprio) &&
376 (!(TAILQ_EMPTY(&workq->item_listhead))))
377 wqreadyprio = workq->queueprio;
378 }
379 }
380 }
381
382 pick_nextworkqueue_droplock();
383
384 (void)thr_workq_thread_return();
385
386 _pthread_exit(NULL);
387 }
388
389
390 /* XXX need to compare notes to thr_create()'s version */
391 static void
_pthread_wqthread(void * arg)392 _pthread_wqthread(void *arg)
393 {
394 pthread_workitem_t item = (pthread_workitem_t)arg;
395 pthread_workqueue_t workq;
396 pthread_t self = _pthread_self();
397
398 /* debug serialization */
399 THR_LOCK(self);
400 THR_UNLOCK(self);
401
402 workq = item->workq;
403 ATOMIC_DEC(&kernel_workq_count);
404
405 (*item->func)(item->func_arg);
406
407 workqueue_exit(self, workq, item);
408
409 /* NOT REACHED */
410 }
411
412 static void
_pthread_newthread(void * arg)413 _pthread_newthread(void *arg)
414 {
415 pthread_workitem_t item = (pthread_workitem_t)arg;
416 pthread_workqueue_t workq;
417 struct pthread *newthread, *mthread;
418
419 /*
420 * This thread has been initiated by the kernel but we need to allocate
421 * the user land now including the TLS.
422 */
423
424 workq = item->workq;
425 mthread = workq->mthread;
426
427 if ((newthread = _thr_alloc(mthread)) == NULL)
428 _pthread_exit(NULL); /* XXX Return some error code? */
429
430 /*
431 * Init the thread structure.
432 */
433
434 /* Use the default thread attributes. */
435 newthread->attr = _pthread_attr_default;
436
437 newthread->magic = THR_MAGIC;
438 newthread->start_routine = item->func;
439 newthread->arg = item->func_arg;
440 newthread->cancel_enable = 1;
441 newthread->cancel_async = 0;
442 /* Initialize the mutex queue: */
443 TAILQ_INIT(&newthread->mutexq);
444 TAILQ_INIT(&newthread->pp_mutexq);
445 newthread->refcount = 1;
446
447 /*
448 * This thread's stack will be recycled in the kernel so record
449 * its address as NULL.
450 */
451 newthread->attr.stackaddr_attr = NULL;
452
453 /*
454 * Get the Thread ID and set the automatic TLS.
455 * XXX It seems we could reduce this to one syscall.
456 */
457 (void)thr_self(&newthread->tid);
458 _tcb_set(newthread->tcb);
459
460 _thr_link(mthread, newthread);
461
462 if (SHOULD_REPORT_EVENT(mthread, TD_CREATE)) {
463 THR_THREAD_LOCK(mthread, newthread);
464 _thr_report_creation(mthread, newthread);
465 THR_THREAD_UNLOCK(mthread, newthread);
466 }
467
468 THR_LOCK(newthread);
469 THR_UNLOCK(newthread);
470
471 /*
472 * Put the new thread to work.
473 */
474 ATOMIC_DEC(&kernel_workq_count);
475
476 (*item->func)(item->func_arg);
477
478 workqueue_exit(newthread, workq, item);
479
480 /* NOT REACHED */
481 }
482
483 static void
_pthread_exitthread(void * arg)484 _pthread_exitthread(void *arg)
485 {
486
487 /*
488 * If the thread gets started with this start function it means
489 * we are shutting down so just exit.
490 */
491 _pthread_exit(NULL);
492 }
493
494 static int
_pthread_work_internal_init(void)495 _pthread_work_internal_init(void)
496 {
497 int i;
498 pthread_workqueue_head_t headp;
499 pthread_workitem_t witemp;
500 pthread_workqueue_t wq;
501 pthread_t curthread = _get_curthread();
502
503 if (kernel_workq_setup == 0) {
504
505 _pthread_wq_attr_default.queueprio = WORKQ_DEFAULT_PRIOQUEUE;
506 _pthread_wq_attr_default.sig = PTHREAD_WORKQUEUE_ATTR_SIG;
507
508 for( i = 0; i< WORKQ_OS_NUMPRIOS; i++) {
509 headp = __pthread_wq_head_tbl[i];
510 TAILQ_INIT(&headp->wqhead);
511 headp->next_workq = 0;
512 }
513
514 /* create work item and workqueue pools */
515 witemp = (struct _pthread_workitem *)malloc(
516 sizeof(struct _pthread_workitem) * WORKITEM_POOL_SIZE);
517 if (witemp == NULL)
518 return (ENOMEM);
519 bzero(witemp, (sizeof(struct _pthread_workitem) *
520 WORKITEM_POOL_SIZE));
521 for (i = 0; i < WORKITEM_POOL_SIZE; i++) {
522 TAILQ_INSERT_TAIL(&__pthread_workitem_pool_head,
523 &witemp[i], item_entry);
524 }
525 wq = (struct _pthread_workqueue *)malloc(
526 sizeof(struct _pthread_workqueue) * WORKQUEUE_POOL_SIZE);
527 if (wq == NULL) {
528 free(witemp);
529 return (ENOMEM);
530 }
531 bzero(wq, (sizeof(struct _pthread_workqueue) *
532 WORKQUEUE_POOL_SIZE));
533 for (i = 0; i < WORKQUEUE_POOL_SIZE; i++) {
534 TAILQ_INSERT_TAIL(&__pthread_workqueue_pool_head,
535 &wq[i], wq_list);
536 }
537
538 /* XXX need to use the workqueue attr's. */
539 if (thr_workq_init(&workq_id, &curthread->attr)) {
540 free(wq);
541 free(witemp);
542 return (errno);
543 }
544
545 kernel_workq_setup = 1;
546 }
547
548 return (0);
549 }
550
551 static void
_pthread_workq_init(pthread_workqueue_t wq,const pthread_workqueue_attr_t * attr)552 _pthread_workq_init(pthread_workqueue_t wq,
553 const pthread_workqueue_attr_t * attr)
554 {
555
556 bzero(wq, sizeof(struct _pthread_workqueue));
557 if (attr != NULL) {
558 wq->queueprio = attr->queueprio;
559 wq->overcommit = attr->overcommit;
560 } else {
561 wq->queueprio = WORKQ_DEFAULT_PRIOQUEUE;
562 wq->overcommit = 0;
563 }
564 wq->flags = 0;
565 TAILQ_INIT(&wq->item_listhead);
566 TAILQ_INIT(&wq->item_kernhead);
567 wq->wq_list.tqe_next = 0;
568 wq->wq_list.tqe_prev = 0;
569 wq->sig = PTHREAD_WORKQUEUE_SIG;
570 wq->headp = __pthread_wq_head_tbl[wq->queueprio];
571 wq->mthread = _get_curthread();
572 wq->affinity = -1; /* XXX not used yet. */
573 }
574
575 int
_pthread_workqueue_init_np(void)576 _pthread_workqueue_init_np(void)
577 {
578 int ret;
579
580 _thr_check_init();
581 _pthread_spin_init(&__workqueue_list_lock, PTHREAD_PROCESS_PRIVATE);
582 workqueue_list_lock();
583 /* XXX - _pthread_attr_init(&_pthread_attr_default); */
584 ret =_pthread_work_internal_init();
585 workqueue_list_unlock();
586
587 return(ret);
588 }
589
590 /*
591 * Pthread Workqueue Attributes.
592 */
593 int
_pthread_workqueue_attr_init_np(pthread_workqueue_attr_t * attrp)594 _pthread_workqueue_attr_init_np(pthread_workqueue_attr_t * attrp)
595 {
596
597 attrp->queueprio = WORKQ_DEFAULT_PRIOQUEUE;
598 attrp->sig = PTHREAD_WORKQUEUE_ATTR_SIG;
599 attrp->overcommit = 0;
600 return (0);
601 }
602
603 int
_pthread_workqueue_attr_destroy_np(pthread_workqueue_attr_t * attr)604 _pthread_workqueue_attr_destroy_np(pthread_workqueue_attr_t * attr)
605 {
606
607 if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG)
608 return (0);
609 else
610 return (EINVAL); /* Not an attribute struct. */
611 }
612
613 int
_pthread_workqueue_attr_getqueuepriority_np(const pthread_workqueue_attr_t * attr,int * qpriop)614 _pthread_workqueue_attr_getqueuepriority_np(
615 const pthread_workqueue_attr_t * attr, int * qpriop)
616 {
617
618 if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) {
619 *qpriop = attr->queueprio;
620 return (0);
621 } else
622 return (EINVAL); /* Not an atribute struct. */
623 }
624
625 int
_pthread_workqueue_attr_setqueuepriority_np(pthread_workqueue_attr_t * attr,int qprio)626 _pthread_workqueue_attr_setqueuepriority_np(pthread_workqueue_attr_t * attr,
627 int qprio)
628 {
629
630 if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) {
631 switch(qprio) {
632 case WORKQ_HIGH_PRIOQUEUE:
633 case WORKQ_DEFAULT_PRIOQUEUE:
634 case WORKQ_LOW_PRIOQUEUE:
635 attr->queueprio = qprio;
636 return (0);
637 default:
638 return (EINVAL);
639 }
640 } else
641 return (EINVAL);
642 }
643
644 int
_pthread_workqueue_attr_getovercommit_np(const pthread_workqueue_attr_t * attr,int * ocommp)645 _pthread_workqueue_attr_getovercommit_np(const pthread_workqueue_attr_t * attr,
646 int * ocommp)
647 {
648
649 if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) {
650 *ocommp = attr->overcommit;
651 return (0);
652 } else
653 return (EINVAL); /* Not an attribute struct. */
654 }
655
656 int
_pthread_workqueue_attr_setovercommit_np(pthread_workqueue_attr_t * attr,int ocomm)657 _pthread_workqueue_attr_setovercommit_np(pthread_workqueue_attr_t * attr,
658 int ocomm)
659 {
660
661 if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) {
662 attr->overcommit = ocomm;
663 return (0);
664 } else
665 return (EINVAL);
666 }
667
668
669 static int
valid_workq(pthread_workqueue_t workq)670 valid_workq(pthread_workqueue_t workq)
671 {
672
673 if (workq->sig == PTHREAD_WORKQUEUE_SIG)
674 return (1);
675 else
676 return (0);
677 }
678
679 int
_pthread_workqueue_getovercommit_np(pthread_workqueue_t workq,unsigned int * ocommp)680 _pthread_workqueue_getovercommit_np(pthread_workqueue_t workq,
681 unsigned int *ocommp)
682 {
683
684 if (valid_workq(workq) == 0)
685 return (EINVAL);
686
687 if (ocommp != NULL)
688 *ocommp = workq->overcommit;
689
690 return (0);
691 }
692
693 /*
694 * Pthread Workqueue support routines.
695 */
696 int
_pthread_workqueue_create_np(pthread_workqueue_t * workqp,const pthread_workqueue_attr_t * attr)697 _pthread_workqueue_create_np(pthread_workqueue_t * workqp,
698 const pthread_workqueue_attr_t * attr)
699 {
700 pthread_workqueue_t wq;
701 pthread_workqueue_head_t headp;
702 int error;
703
704 if ((attr != NULL) && (attr->sig != PTHREAD_WORKQUEUE_ATTR_SIG))
705 return (EINVAL);
706
707 _thr_check_init();
708
709 workqueue_list_lock();
710 if (kernel_workq_setup == 0) {
711 error = _pthread_work_internal_init();
712 if (error) {
713 workqueue_list_unlock();
714 return (error);
715 }
716 }
717
718 wq = alloc_workqueue();
719 if (wq == NULL) {
720 workqueue_list_unlock();
721 return (ENOMEM);
722 }
723
724 _pthread_workq_init(wq, attr);
725
726 headp = __pthread_wq_head_tbl[wq->queueprio];
727 TAILQ_INSERT_TAIL(&headp->wqhead, wq, wq_list);
728 if (headp->next_workq == NULL)
729 headp->next_workq = TAILQ_FIRST(&headp->wqhead);
730 workqueue_list_unlock();
731
732 *workqp = wq;
733
734 return (0);
735 }
736
737 /*
738 * alloc_workitem() is called with the list lock held. It will drop the lock
739 * if we need to actually alocate memory.
740 */
741 static pthread_workitem_t
alloc_workitem(void)742 alloc_workitem(void)
743 {
744 pthread_workitem_t witem;
745
746 if (TAILQ_EMPTY(&__pthread_workitem_pool_head)) {
747 workqueue_list_unlock();
748 witem = malloc(sizeof(struct _pthread_workitem));
749 if (witem == NULL)
750 return (NULL);
751 witem->gencount = 0;
752 workqueue_list_lock();
753 } else {
754 witem = TAILQ_FIRST(&__pthread_workitem_pool_head);
755 TAILQ_REMOVE(&__pthread_workitem_pool_head, witem, item_entry);
756 }
757 return (witem);
758 }
759
760 /*
761 * free_workitem() is called with the list lock held.
762 */
763 static void
free_workitem(pthread_workitem_t witem)764 free_workitem(pthread_workitem_t witem)
765 {
766
767 witem->gencount++;
768 TAILQ_INSERT_TAIL(&__pthread_workitem_pool_head, witem, item_entry);
769 }
770
771 /*
772 * alloc_workqueue() is called with list lock held.
773 */
774 static pthread_workqueue_t
alloc_workqueue(void)775 alloc_workqueue(void)
776 {
777 pthread_workqueue_t wq;
778
779 if (TAILQ_EMPTY(&__pthread_workqueue_pool_head)) {
780 workqueue_list_unlock();
781 wq = malloc(sizeof(struct _pthread_workqueue));
782 if (wq == NULL)
783 return (NULL);
784 workqueue_list_lock();
785 } else {
786 wq = TAILQ_FIRST(&__pthread_workqueue_pool_head);
787 TAILQ_REMOVE(&__pthread_workqueue_pool_head, wq, wq_list);
788 }
789 user_workq_count++;
790 return(wq);
791 }
792
793 /*
794 * free_workqueue() is called with list lock held.
795 */
796 static void
free_workqueue(pthread_workqueue_t wq)797 free_workqueue(pthread_workqueue_t wq)
798 {
799
800 user_workq_count--;
801 TAILQ_INSERT_TAIL(&__pthread_workqueue_pool_head, wq, wq_list);
802 }
803
804 static int
post_nextworkitem(pthread_workqueue_t workq)805 post_nextworkitem(pthread_workqueue_t workq)
806 {
807 int error, prio;
808 pthread_workitem_t witem;
809 pthread_workqueue_head_t headp;
810 void (*func)(pthread_workqueue_t, void *);
811
812 if ((workq->flags & PTHREAD_WORKQ_SUSPEND) == PTHREAD_WORKQ_SUSPEND)
813 return (0);
814
815 if (TAILQ_EMPTY(&workq->item_listhead))
816 return (0);
817
818 if ((workq->flags & PTHREAD_WORKQ_BARRIER_ON) ==
819 PTHREAD_WORKQ_BARRIER_ON)
820 return (0);
821
822 witem = TAILQ_FIRST(&workq->item_listhead);
823 headp = workq->headp;
824
825 if ((witem->flags & PTH_WQITEM_BARRIER) == PTH_WQITEM_BARRIER) {
826 if ((witem->flags & PTH_WQITEM_APPLIED) != 0)
827 return (0);
828
829 /*
830 * Also barrier when nothing needs to be handled and
831 * nothing to wait for.
832 */
833 if (workq->kq_count != 0) {
834 witem->flags |= PTH_WQITEM_APPLIED;
835 workq->flags |= PTHREAD_WORKQ_BARRIER_ON;
836 workq->barrier_count = workq->kq_count;
837
838 return (1);
839 } else {
840 if (witem->func != NULL) {
841 /* We are going to drop list lock. */
842 witem->flags |= PTH_WQITEM_APPLIED;
843 workq->flags |= PTHREAD_WORKQ_BARRIER_ON;
844 workqueue_list_unlock();
845
846 func = (void (*)(pthread_workqueue_t, void *))
847 witem->func;
848 (*func)(workq, witem->func_arg);
849
850 workqueue_list_lock();
851 workq->flags &= ~PTHREAD_WORKQ_BARRIER_ON;
852 }
853 TAILQ_REMOVE(&workq->item_listhead, witem, item_entry);
854 witem->flags = 0;
855 free_workitem(witem);
856
857 return (1);
858 }
859 } else if ((witem->flags & PTH_WQITEM_DESTROY) == PTH_WQITEM_DESTROY) {
860 if ((witem->flags & PTH_WQITEM_APPLIED) != 0)
861 return (0);
862 witem->flags |= PTH_WQITEM_APPLIED;
863 workq->flags |=
864 (PTHREAD_WORKQ_BARRIER_ON | PTHREAD_WORKQ_TERM_ON);
865 workq->barrier_count = workq->kq_count;
866 workq->term_callback =
867 (void (*)(struct _pthread_workqueue *,void *))witem->func;
868 workq->term_callarg = witem->func_arg;
869 TAILQ_REMOVE(&workq->item_listhead, witem, item_entry);
870
871 if ((TAILQ_EMPTY(&workq->item_listhead)) &&
872 (workq->kq_count == 0)) {
873 witem->flags = 0;
874 free_workitem(witem);
875 workq->flags |= PTHREAD_WORKQ_DESTROYED;
876
877 headp = __pthread_wq_head_tbl[workq->queueprio];
878 if (headp->next_workq == workq) {
879 headp->next_workq = TAILQ_NEXT(workq, wq_list);
880 if (headp->next_workq == NULL) {
881 headp->next_workq =
882 TAILQ_FIRST(&headp->wqhead);
883 if (headp->next_workq == workq)
884 headp->next_workq = NULL;
885 }
886 }
887 workq->sig = 0;
888 TAILQ_REMOVE(&headp->wqhead, workq, wq_list);
889 if (workq->term_callback != NULL) {
890 workqueue_list_unlock();
891 (*workq->term_callback)(workq,
892 workq->term_callarg);
893 workqueue_list_lock();
894 }
895 free_workqueue(workq);
896 return (1);
897 } else
898 TAILQ_INSERT_HEAD(&workq->item_listhead, witem,
899 item_entry);
900
901 return (1);
902
903 } else {
904 TAILQ_REMOVE(&workq->item_listhead, witem, item_entry);
905 if ((witem->flags & PTH_WQITEM_KERN_COUNT) == 0) {
906 workq->kq_count++;
907 witem->flags |= PTH_WQITEM_KERN_COUNT;
908 }
909 ATOMIC_INC(&kernel_workq_count);
910
911 workqueue_list_unlock();
912
913 prio = workq->queueprio;
914 if (workq->overcommit != 0)
915 prio |= WORKQUEUE_OVERCOMMIT;
916
917 if ((error = thr_workq_queue_add(witem,
918 workq->affinity, prio)) == -1) {
919 ATOMIC_DEC(&kernel_workq_count);
920
921 workqueue_list_lock();
922 TAILQ_REMOVE(&workq->item_kernhead, witem, item_entry);
923 TAILQ_INSERT_HEAD(&workq->item_listhead, witem,
924 item_entry);
925 if ((workq->flags & (PTHREAD_WORKQ_BARRIER_ON |
926 PTHREAD_WORKQ_TERM_ON)) != 0)
927 workq->flags |= PTHREAD_WORKQ_REQUEUED;
928 } else
929 workqueue_list_lock();
930
931 return (1);
932 }
933
934 /* NOT REACHED. */
935
936 PANIC("Error in logic for post_nextworkitem()");
937
938 return (0);
939 }
940
941 /*
942 * pick_nextworkqueue_droplock() is called with the list lock held and
943 * drops the lock.
944 */
945 static void
pick_nextworkqueue_droplock(void)946 pick_nextworkqueue_droplock(void)
947 {
948 int i, curwqprio, val, found;
949 pthread_workqueue_head_t headp;
950 pthread_workqueue_t workq;
951 pthread_workqueue_t nworkq = NULL;
952
953
954 loop:
955 while (kernel_workq_count < WORKQ_OS_ELEM_MAX) {
956 found = 0;
957 for (i = 0; i < WORKQ_OS_NUMPRIOS; i++) {
958 /* There is nothing else higher to run. */
959 wqreadyprio = i;
960 headp = __pthread_wq_head_tbl[i];
961
962 if (TAILQ_EMPTY(&headp->wqhead))
963 continue;
964 workq = headp->next_workq;
965 if (workq == NULL)
966 workq = TAILQ_FIRST(&headp->wqhead);
967 curwqprio = workq->queueprio;
968 nworkq = workq;
969 while (kernel_workq_count < WORKQ_OS_ELEM_MAX) {
970 headp->next_workq = TAILQ_NEXT(workq, wq_list);
971 if (headp->next_workq == NULL)
972 headp->next_workq =
973 TAILQ_FIRST(&headp->wqhead);
974
975 val = post_nextworkitem(workq);
976 if (val != 0) {
977 /*
978 * Things could have change so let's
979 * reassess. If kernel queu is full
980 * then skip.
981 */
982 if (kernel_workq_count >=
983 WORKQ_OS_ELEM_MAX)
984 break;
985 /*
986 * If anything with higher prio arrived
987 * then reevaluate.
988 */
989 if (wqreadyprio < curwqprio)
990 goto loop; /* re-evaluate */
991 /*
992 * We can post some more work items.
993 */
994 found = 1;
995 }
996
997 /*
998 * We cannot use workq here as it could be
999 * freed.
1000 */
1001 if (TAILQ_EMPTY(&headp->wqhead))
1002 break;
1003 /*
1004 * If we found nothing to run and only one
1005 * workqueue in the list, skip.
1006 */
1007 if ((val == 0) && (workq == headp->next_workq))
1008 break;
1009 workq = headp->next_workq;
1010 if (workq == NULL)
1011 workq = TAILQ_FIRST(&headp->wqhead);
1012 if (val != 0)
1013 nworkq = workq;
1014 /*
1015 * If we found nothing to run then back to workq
1016 * where we started.
1017 */
1018 if ((val == 0) && (workq == nworkq))
1019 break;
1020 }
1021 if (kernel_workq_count >= WORKQ_OS_ELEM_MAX)
1022 break;
1023 }
1024 /* Nothing found to run? */
1025 if (found == 0)
1026 break;
1027
1028 }
1029 workqueue_list_unlock();
1030 }
1031
1032
1033 int
_pthread_workqueue_additem_np(pthread_workqueue_t workq,void * (* workitem_func)(void *),void * workitem_arg,pthread_workitem_handle_t * itemhandlep,unsigned int * gencountp)1034 _pthread_workqueue_additem_np(pthread_workqueue_t workq,
1035 void *( *workitem_func)(void *), void * workitem_arg,
1036 pthread_workitem_handle_t * itemhandlep, unsigned int *gencountp)
1037 {
1038 pthread_workitem_t witem;
1039
1040 if (valid_workq(workq) == 0)
1041 return (EINVAL);
1042
1043 workqueue_list_lock();
1044 /*
1045 * Allocate the workitem here as it can drop the lock. Also we can
1046 * evaluate the workqueue state only once.
1047 */
1048 witem = alloc_workitem();
1049 if (witem == NULL) {
1050 workqueue_list_unlock();
1051 return (ENOMEM);
1052 }
1053 witem->func = workitem_func;
1054 witem->func_arg = workitem_arg;
1055 witem->flags = 0;
1056 witem->workq = workq;
1057 witem->item_entry.tqe_next = 0;
1058 witem->item_entry.tqe_prev = 0;
1059
1060 /* alloc_workitem() can drop the lock, check the state. */
1061 if ((workq->flags &
1062 (PTHREAD_WORKQ_IN_TERMINATE | PTHREAD_WORKQ_DESTROYED)) != 0) {
1063 free_workitem(witem);
1064 workqueue_list_unlock();
1065 *itemhandlep = 0;
1066 return (ESRCH);
1067 }
1068
1069 if (itemhandlep != NULL)
1070 *itemhandlep = (pthread_workitem_handle_t *)witem;
1071 if (gencountp != NULL)
1072 *gencountp = witem->gencount;
1073 TAILQ_INSERT_TAIL(&workq->item_listhead, witem, item_entry);
1074 if (((workq->flags & PTHREAD_WORKQ_BARRIER_ON) == 0) &&
1075 (workq->queueprio < wqreadyprio))
1076 wqreadyprio = workq->queueprio;
1077
1078 pick_nextworkqueue_droplock();
1079
1080 return (0);
1081 }
1082
1083 /*
1084 * Pthread Workqueue support routines.
1085 */
1086 int
_pthread_workqueue_requestconcurrency_np(int queue,int request_concurrency)1087 _pthread_workqueue_requestconcurrency_np(int queue, int request_concurrency)
1088 {
1089 int error = 0;
1090
1091 if (queue < 0 || queue > WORKQ_OS_NUMPRIOS)
1092 return (EINVAL);
1093
1094 error = thr_workq_thread_setconc(queue, request_concurrency);
1095
1096 if (error == -1)
1097 return (errno);
1098
1099 return (0);
1100 }
1101
1102 void
_pthread_workqueue_atfork_prepare(void)1103 _pthread_workqueue_atfork_prepare(void)
1104 {
1105
1106 if (dispatch_atfork_prepare != 0)
1107 dispatch_atfork_prepare();
1108 }
1109
1110 void
_pthread_workqueue_atfork_parent(void)1111 _pthread_workqueue_atfork_parent(void)
1112 {
1113
1114 if (dispatch_atfork_parent != 0)
1115 dispatch_atfork_parent();
1116 }
1117
1118 void
_pthread_workqueue_atfork_child(void)1119 _pthread_workqueue_atfork_child(void)
1120 {
1121
1122 (void)_pthread_spin_destroy(&__workqueue_list_lock);
1123 (void)_pthread_spin_init(&__workqueue_list_lock,
1124 PTHREAD_PROCESS_PRIVATE);
1125 if (kernel_workq_setup != 0) {
1126 kernel_workq_setup = 0;
1127 _pthread_work_internal_init();
1128 }
1129 if (dispatch_atfork_child != 0)
1130 dispatch_atfork_child();
1131 }
1132
1133