xref: /NextBSD/lib/libthr/thread/thr_workq.c (revision 2032d79c5dea581be0a9bed38c15e14efe8c0892)
1 /*-
2  * Copyright (c) 2009-2014, Stacey Son <sson@freebsd.org>
3  * Copyright (c) 2000-2008, Apple Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice unmodified, this list of conditions, and the following
11  *    disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  *
27  * $FreeBSD: $
28  */
29 
30 #include "namespace.h"
31 #include <sys/types.h>
32 #include <sys/queue.h>
33 #include <sys/rtprio.h>
34 #include <sys/signalvar.h>
35 #include <sys/thrworkq.h>
36 #include <errno.h>
37 #include <stdlib.h>
38 #include <string.h>
39 #include <stddef.h>
40 #include <pthread.h>
41 #include <pthread_np.h>
42 #include <pthread_workqueue.h>
43 #include "un-namespace.h"
44 
45 #include "thr_private.h"
46 
47 typedef struct _pthread_workitem {
48 	TAILQ_ENTRY(_pthread_workitem) item_entry; 	/* pthread_workitem
49 							   list in prio */
50 	void			*(*func)(void *);
51 	void			*func_arg;
52 	struct _pthread_workqueue *workq;
53 	unsigned int		 flags;
54 	unsigned int		 gencount;
55 } *pthread_workitem_t;
56 
57 typedef struct  _pthread_workqueue_head {
58 	TAILQ_HEAD(, _pthread_workqueue) wqhead;
59 	struct _pthread_workqueue * next_workq;
60 } * pthread_workqueue_head_t;
61 
62 struct  _pthread_workqueue {
63 	struct pthread *mthread;	/* main thread */
64 	unsigned int	sig;	/* Unique signature for this structure */
65 	struct umutex	lock;	/* Used for internal mutex on structure */
66 	TAILQ_ENTRY(_pthread_workqueue) wq_list;  /* workqueue list in prio */
67 	TAILQ_HEAD(, _pthread_workitem) item_listhead; /* pthread_workitem
68 							  list in prio */
69 	TAILQ_HEAD(, _pthread_workitem) item_kernhead;  /* pthread_workitem
70 							   list in prio */
71 	unsigned int	flags;
72 	size_t		stacksize;
73 	int		istimeshare;
74 	int		importance;
75 	int		affinity;  /* XXX - not used yet */
76 	int		queueprio;
77 	int		barrier_count;
78 	int		kq_count;
79 	void		(*term_callback)(struct _pthread_workqueue *,void *);
80 	void		*term_callarg;
81 	pthread_workqueue_head_t headp;
82 	int		overcommit;
83 #if ! defined(__x86_64__)
84 	unsigned int	rev2[11];
85 #endif
86 };
87 
88 /*
89  * Workqueue flags.
90  */
91 #define	PTHREAD_WORKQ_IN_CREATION	0x0001
92 #define	PTHREAD_WORKQ_IN_TERMINATE	0x0002
93 #define	PTHREAD_WORKQ_BARRIER_ON	0x0004
94 #define	PTHREAD_WORKQ_TERM_ON		0x0008
95 #define	PTHREAD_WORKQ_DESTROYED		0x0010
96 #define	PTHREAD_WORKQ_REQUEUED 		0x0020
97 #define	PTHREAD_WORKQ_SUSPEND		0x0040
98 
99 /*
100  * Workitem flags.
101  */
102 #define	PTH_WQITEM_INKERNEL_QUEUE	0x0001
103 #define	PTH_WQITEM_RUNNING		0x0002
104 #define	PTH_WQITEM_COMPLETED		0x0004
105 #define	PTH_WQITEM_REMOVED		0x0008
106 #define	PTH_WQITEM_BARRIER 		0x0010
107 #define	PTH_WQITEM_DESTROY		0x0020
108 #define	PTH_WQITEM_NOTINLIST		0x0040
109 #define	PTH_WQITEM_APPLIED		0x0080
110 #define	PTH_WQITEM_KERN_COUNT		0x0100
111 
112 /*
113  * Signatures/magic numbers.
114  */
115 #define	PTHREAD_WORKQUEUE_SIG		0xBEBEBEBE
116 #define	PTHREAD_WORKQUEUE_ATTR_SIG	0xBEBEBEBE
117 
118 /*
119  * Memory pool sizes.
120  */
121 #define	WORKITEM_POOL_SIZE		1000
122 #define	WORKQUEUE_POOL_SIZE		 100
123 
124 static pthread_spinlock_t __workqueue_list_lock;
125 static int kernel_workq_setup = 0;
126 static int workq_id = 0;
127 static int wqreadyprio = 0; /* current highest prio queue ready with items */
128 static pthread_workqueue_attr_t	_pthread_wq_attr_default = {
129 	.sig = 0,
130 	.queueprio = 0,
131 	.overcommit = 0,
132 };
133 static volatile int32_t kernel_workq_count = 0;
134 static volatile int32_t user_workq_count = 0;
135 
136 static TAILQ_HEAD(__pthread_workqueue_pool, _pthread_workqueue)
137 	__pthread_workqueue_pool_head = TAILQ_HEAD_INITIALIZER(
138 	    __pthread_workqueue_pool_head);
139 static TAILQ_HEAD(__pthread_workitem_pool, _pthread_workitem)
140 	__pthread_workitem_pool_head = TAILQ_HEAD_INITIALIZER(
141 	    __pthread_workitem_pool_head);
142 
143 
144 struct _pthread_workqueue_head	__pthread_workq0_head;
145 struct _pthread_workqueue_head	__pthread_workq1_head;
146 struct _pthread_workqueue_head	__pthread_workq2_head;
147 pthread_workqueue_head_t __pthread_wq_head_tbl[WORKQ_OS_NUMPRIOS] = {
148 	&__pthread_workq0_head,
149 	&__pthread_workq1_head,
150 	&__pthread_workq2_head
151 };
152 
153 static void workqueue_list_lock(void);
154 static void workqueue_list_unlock(void);
155 static pthread_workitem_t alloc_workitem(void);
156 static void free_workitem(pthread_workitem_t witem);
157 static pthread_workqueue_t alloc_workqueue(void);
158 static void free_workqueue(pthread_workqueue_t wq);
159 static void _pthread_wqthread(void *arg);
160 static void _pthread_newthread(void *arg);
161 static void _pthread_exitthread(void *arg);
162 static void pick_nextworkqueue_droplock(void);
163 
164 int _pthread_workqueue_init_np(void);
165 int _pthread_workqueue_attr_init_np(pthread_workqueue_attr_t * attrp);
166 int _pthread_workqueue_attr_destroy_np(pthread_workqueue_attr_t * attr);
167 int _pthread_workqueue_attr_getqueuepriority_np(
168 		    const pthread_workqueue_attr_t * attr, int * qpriop);
169 int _pthread_workqueue_attr_setqueuepriority_np(pthread_workqueue_attr_t * attr,
170 		    int qprio);
171 int _pthread_workqueue_attr_getovercommit_np(
172 		    const pthread_workqueue_attr_t * attr, int * ocommp);
173 int _pthread_workqueue_attr_setovercommit_np(pthread_workqueue_attr_t * attr,
174 		    int ocomm);
175 int _pthread_workqueue_create_np(pthread_workqueue_t * workqp,
176 		    const pthread_workqueue_attr_t * attr);
177 int _pthread_workqueue_additem_np(pthread_workqueue_t workq,
178 		    void *( *workitem_func)(void *), void * workitem_arg,
179 		    pthread_workitem_handle_t * itemhandlep,
180 		    unsigned int *gencountp);
181 int _pthread_workqueue_requestconcurrency_np(int queue,
182 		    int request_concurrency);
183 int _pthread_workqueue_getovercommit_np(pthread_workqueue_t workq,
184 		    unsigned int *ocommp);
185 void _pthread_workqueue_atfork_prepare(void);
186 void _pthread_workqueue_atfork_parent(void);
187 void _pthread_workqueue_atfork_child(void);
188 
189 __weak_reference(_pthread_workqueue_init_np, pthread_workqueue_init_np);
190 __weak_reference(_pthread_workqueue_attr_init_np,
191 		    pthread_workqueue_attr_init_np);
192 __weak_reference(_pthread_workqueue_attr_destroy_np,
193 		    pthread_workqueue_attr_destroy_np);
194 __weak_reference(_pthread_workqueue_attr_getqueuepriority_np,
195 		    pthread_workqueue_attr_getqueuepriority_np);
196 __weak_reference(_pthread_workqueue_attr_setqueuepriority_np,
197 		    pthread_workqueue_attr_setqueuepriority_np);
198 __weak_reference(_pthread_workqueue_attr_getovercommit_np,
199 		    pthread_workqueue_attr_getovercommit_np);
200 __weak_reference(_pthread_workqueue_attr_setovercommit_np,
201 		    pthread_workqueue_attr_setovercommit_np);
202 __weak_reference(_pthread_workqueue_getovercommit_np,
203 		    pthread_workqueue_getovercommit_np);
204 __weak_reference(_pthread_workqueue_create_np, pthread_workqueue_create_np);
205 __weak_reference(_pthread_workqueue_additem_np, pthread_workqueue_additem_np);
206 __weak_reference(_pthread_workqueue_requestconcurrency_np,
207 		    pthread_workqueue_requestconcurrency_np);
208 __weak_reference(_pthread_workqueue_atfork_prepare,
209     		    pthread_workqueue_atfork_prepare);
210 __weak_reference(_pthread_workqueue_atfork_parent,
211     		    pthread_workqueue_atfork_parent);
212 __weak_reference(_pthread_workqueue_atfork_child,
213     		    pthread_workqueue_atfork_child);
214 
215 /*
216  * dispatch_atfork_{prepare(void), parent(void), child(void)}} are provided by
217  * libdispatch which may not be linked.
218  */
219 __attribute__((weak)) void dispatch_atfork_prepare(void);
220 __attribute__((weak)) void dispatch_atfork_parent(void);
221 __attribute__((weak)) void dispatch_atfork_child(void);
222 
223 #if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2)
224 #define ATOMIC_INC(p)		__sync_add_and_fetch((p), 1)
225 #define ATOMIC_DEC(p)		__sync_sub_and_fetch((p), 1)
226 #else
227 #define	ATOMIC_INC(p)		atomic_fetchadd_int(p, 1)
228 #define	ATOMIC_DEC(p)		atomic_fetchadd_int(p, -1)
229 #endif
230 
231 static void
workqueue_list_lock(void)232 workqueue_list_lock(void)
233 {
234 
235 	_pthread_spin_lock(&__workqueue_list_lock);
236 }
237 
238 static void
workqueue_list_unlock(void)239 workqueue_list_unlock(void)
240 {
241 
242  	_pthread_spin_unlock(&__workqueue_list_lock);
243 }
244 
245 /*
246  * Round up size to the nearest multiple of _thr_page_size.
247  */
248 static size_t
round_up(size_t size)249 round_up(size_t size)
250 {
251 
252 	if (size % _thr_page_size != 0)
253 		size = ((size / _thr_page_size) + 1) *
254 		    _thr_page_size;
255 	return (size);
256 }
257 
258 static int
thr_workq_init(int * retid,struct pthread_attr * attr)259 thr_workq_init(int *retid, struct pthread_attr *attr)
260 {
261 	struct twq_param twq;
262 
263 	twq.twq_retid = retid;
264 	twq.twq_workqfunc = _pthread_wqthread;
265 	twq.twq_newtdfunc = _pthread_newthread;
266 	twq.twq_exitfunc = _pthread_exitthread;
267 	twq.twq_stacksize = round_up(attr->stacksize_attr);
268 	twq.twq_guardsize = round_up(attr->guardsize_attr);
269 
270 	return (thr_workq(WQOPS_INIT, &twq));
271 }
272 
273 static int
thr_workq_thread_return(void)274 thr_workq_thread_return(void)
275 {
276 	struct twq_param twq;
277 
278 	twq.twq_id = workq_id;
279 	return (thr_workq(WQOPS_THREAD_RETURN, &twq));
280 }
281 
282 static int
thr_workq_queue_add(pthread_workitem_t witem,int affinity,int prio)283 thr_workq_queue_add(pthread_workitem_t witem, int affinity, int prio)
284 {
285 	struct twq_param twq;
286 
287 	twq.twq_id = workq_id;
288 	twq.twq_add_item = (void *)witem;
289 	twq.twq_add_affin = affinity;
290 	twq.twq_add_prio = prio;
291 
292 	return (thr_workq(WQOPS_QUEUE_ADD, &twq));
293 }
294 
295 static int
thr_workq_thread_setconc(int queue,int request_concurrency)296 thr_workq_thread_setconc(int queue, int request_concurrency)
297 {
298 	struct twq_param twq;
299 
300 	twq.twq_id = workq_id;
301 	twq.twq_setconc_prio = queue;
302 	twq.twq_setconc_conc = request_concurrency;
303 
304 	return (thr_workq(WQOPS_THREAD_SETCONC, &twq));
305 }
306 
307 static void
workqueue_exit(pthread_t self,pthread_workqueue_t workq,pthread_workitem_t item)308 workqueue_exit(pthread_t self, pthread_workqueue_t workq,
309     pthread_workitem_t item)
310 {
311 	pthread_workitem_t baritem;
312 	pthread_workqueue_head_t headp;
313 	void (*func)(pthread_workqueue_t, void *);
314 
315 	workqueue_list_lock();
316 	TAILQ_REMOVE(&workq->item_kernhead, item, item_entry);
317 	workq->kq_count--;
318 
319 	item->flags = 0;
320 	free_workitem(item);
321 
322 	if ((workq->flags & PTHREAD_WORKQ_BARRIER_ON) ==
323 	    PTHREAD_WORKQ_BARRIER_ON) {
324 		workq->barrier_count--;
325 
326 		if (workq->barrier_count <= 0 ) {
327 			/* Need to remove barrier item from the list. */
328 			baritem = TAILQ_FIRST(&workq->item_listhead);
329 
330 			/*
331 			 * If the front item is a barrier and call back is
332 			 * registered, run that.
333 			 */
334 			if (((baritem->flags & PTH_WQITEM_BARRIER) ==
335 				PTH_WQITEM_BARRIER) && (baritem->func != NULL)){
336 
337 				workqueue_list_unlock();
338 				func = (void (*)(pthread_workqueue_t, void *))
339 				    baritem->func;
340 				(*func)(workq, baritem->func_arg);
341 				workqueue_list_lock();
342 			}
343 			TAILQ_REMOVE(&workq->item_listhead, baritem,
344 			    item_entry);
345 			baritem->flags = 0;
346 			free_workitem(baritem);
347 			workq->flags &= ~PTHREAD_WORKQ_BARRIER_ON;
348 			if ((workq->flags & PTHREAD_WORKQ_TERM_ON) != 0) {
349 				headp = __pthread_wq_head_tbl[workq->queueprio];
350 				workq->flags |= PTHREAD_WORKQ_DESTROYED;
351 				if (headp->next_workq == workq) {
352 					headp->next_workq =
353 					    TAILQ_NEXT(workq, wq_list);
354 					if (headp->next_workq == NULL) {
355 						headp->next_workq =
356 						    TAILQ_FIRST(&headp->wqhead);
357 						if (headp->next_workq == workq)
358 							headp->next_workq=NULL;
359 					}
360 				}
361 				TAILQ_REMOVE(&headp->wqhead, workq, wq_list);
362 				workq->sig = 0;
363 				if (workq->term_callback != NULL) {
364 					workqueue_list_unlock();
365 					(*workq->term_callback)(workq,
366 					    workq->term_callarg);
367 					workqueue_list_lock();
368 				}
369 				free_workqueue(workq);
370 			} else {
371 				/*
372 				 * If there are higher prio item then reset
373 				 * to wqreadyprio.
374 				 */
375 				if ((workq->queueprio < wqreadyprio) &&
376 				    (!(TAILQ_EMPTY(&workq->item_listhead))))
377 					wqreadyprio = workq->queueprio;
378 			}
379 		}
380 	}
381 
382 	pick_nextworkqueue_droplock();
383 
384 	(void)thr_workq_thread_return();
385 
386 	_pthread_exit(NULL);
387 }
388 
389 
390 /* XXX need to compare notes to thr_create()'s version */
391 static void
_pthread_wqthread(void * arg)392 _pthread_wqthread(void *arg)
393 {
394 	pthread_workitem_t item = (pthread_workitem_t)arg;
395 	pthread_workqueue_t workq;
396 	pthread_t self = _pthread_self();
397 
398 	/* debug serialization */
399 	THR_LOCK(self);
400 	THR_UNLOCK(self);
401 
402 	workq = item->workq;
403 	ATOMIC_DEC(&kernel_workq_count);
404 
405 	(*item->func)(item->func_arg);
406 
407 	workqueue_exit(self, workq, item);
408 
409 	/* NOT REACHED */
410 }
411 
412 static void
_pthread_newthread(void * arg)413 _pthread_newthread(void *arg)
414 {
415 	pthread_workitem_t item = (pthread_workitem_t)arg;
416 	pthread_workqueue_t workq;
417 	struct pthread *newthread, *mthread;
418 
419 	/*
420 	 * This thread has been initiated by the kernel but we need to allocate
421 	 * the user land now including the TLS.
422 	 */
423 
424 	workq = item->workq;
425 	mthread = workq->mthread;
426 
427 	if ((newthread = _thr_alloc(mthread)) == NULL)
428 		_pthread_exit(NULL);  /* XXX Return some error code? */
429 
430 	/*
431 	 * Init the thread structure.
432 	 */
433 
434 	/* Use the default thread attributes. */
435 	newthread->attr = _pthread_attr_default;
436 
437 	newthread->magic = THR_MAGIC;
438 	newthread->start_routine = item->func;
439 	newthread->arg = item->func_arg;
440 	newthread->cancel_enable = 1;
441 	newthread->cancel_async = 0;
442 	/* Initialize the mutex queue: */
443 	TAILQ_INIT(&newthread->mutexq);
444 	TAILQ_INIT(&newthread->pp_mutexq);
445 	newthread->refcount = 1;
446 
447 	/*
448 	 * This thread's stack will be recycled in the kernel so record
449 	 * its address as NULL.
450 	 */
451 	newthread->attr.stackaddr_attr = NULL;
452 
453 	/*
454 	 * Get the Thread ID and set the automatic TLS.
455 	 * XXX It seems we could reduce this to one syscall.
456 	 */
457 	(void)thr_self(&newthread->tid);
458 	_tcb_set(newthread->tcb);
459 
460 	_thr_link(mthread, newthread);
461 
462 	if (SHOULD_REPORT_EVENT(mthread, TD_CREATE)) {
463 		THR_THREAD_LOCK(mthread, newthread);
464 		_thr_report_creation(mthread, newthread);
465 		THR_THREAD_UNLOCK(mthread, newthread);
466 	}
467 
468 	THR_LOCK(newthread);
469 	THR_UNLOCK(newthread);
470 
471 	/*
472 	 * Put the new thread to work.
473 	 */
474 	ATOMIC_DEC(&kernel_workq_count);
475 
476 	(*item->func)(item->func_arg);
477 
478 	workqueue_exit(newthread, workq, item);
479 
480 	/* NOT REACHED */
481 }
482 
483 static void
_pthread_exitthread(void * arg)484 _pthread_exitthread(void *arg)
485 {
486 
487 	/*
488 	 * If the thread gets started with this start function it means
489 	 * we are shutting down so just exit.
490 	 */
491 	_pthread_exit(NULL);
492 }
493 
494 static int
_pthread_work_internal_init(void)495 _pthread_work_internal_init(void)
496 {
497 	int i;
498 	pthread_workqueue_head_t headp;
499 	pthread_workitem_t witemp;
500 	pthread_workqueue_t wq;
501 	pthread_t curthread = _get_curthread();
502 
503 	if (kernel_workq_setup == 0) {
504 
505 		_pthread_wq_attr_default.queueprio = WORKQ_DEFAULT_PRIOQUEUE;
506 		_pthread_wq_attr_default.sig = PTHREAD_WORKQUEUE_ATTR_SIG;
507 
508 		for( i = 0; i< WORKQ_OS_NUMPRIOS; i++) {
509 			headp = __pthread_wq_head_tbl[i];
510 			TAILQ_INIT(&headp->wqhead);
511 			headp->next_workq = 0;
512 		}
513 
514 		/* create work item and workqueue pools */
515 		witemp = (struct _pthread_workitem *)malloc(
516 		    sizeof(struct _pthread_workitem) * WORKITEM_POOL_SIZE);
517 		if (witemp == NULL)
518 			return (ENOMEM);
519 		bzero(witemp, (sizeof(struct _pthread_workitem) *
520 			WORKITEM_POOL_SIZE));
521 		for (i = 0; i < WORKITEM_POOL_SIZE; i++) {
522 			TAILQ_INSERT_TAIL(&__pthread_workitem_pool_head,
523 			    &witemp[i], item_entry);
524 		}
525 		wq = (struct _pthread_workqueue *)malloc(
526 		    sizeof(struct _pthread_workqueue) * WORKQUEUE_POOL_SIZE);
527 		if (wq == NULL) {
528 			free(witemp);
529 			return (ENOMEM);
530 		}
531 		bzero(wq, (sizeof(struct _pthread_workqueue) *
532 			WORKQUEUE_POOL_SIZE));
533 		for (i = 0; i < WORKQUEUE_POOL_SIZE; i++) {
534 			TAILQ_INSERT_TAIL(&__pthread_workqueue_pool_head,
535 			    &wq[i], wq_list);
536 		}
537 
538 		/* XXX need to use the workqueue attr's. */
539 		if (thr_workq_init(&workq_id, &curthread->attr)) {
540 			free(wq);
541 			free(witemp);
542 			return (errno);
543 		}
544 
545 		kernel_workq_setup = 1;
546 	}
547 
548 	return (0);
549 }
550 
551 static void
_pthread_workq_init(pthread_workqueue_t wq,const pthread_workqueue_attr_t * attr)552 _pthread_workq_init(pthread_workqueue_t wq,
553     const pthread_workqueue_attr_t * attr)
554 {
555 
556 	bzero(wq, sizeof(struct _pthread_workqueue));
557 	if (attr != NULL) {
558 		wq->queueprio = attr->queueprio;
559 		wq->overcommit = attr->overcommit;
560 	} else {
561 		wq->queueprio = WORKQ_DEFAULT_PRIOQUEUE;
562 		wq->overcommit = 0;
563 	}
564 	wq->flags = 0;
565 	TAILQ_INIT(&wq->item_listhead);
566 	TAILQ_INIT(&wq->item_kernhead);
567 	wq->wq_list.tqe_next = 0;
568 	wq->wq_list.tqe_prev = 0;
569 	wq->sig = PTHREAD_WORKQUEUE_SIG;
570 	wq->headp = __pthread_wq_head_tbl[wq->queueprio];
571 	wq->mthread = _get_curthread();
572 	wq->affinity = -1;  /* XXX not used yet. */
573 }
574 
575 int
_pthread_workqueue_init_np(void)576 _pthread_workqueue_init_np(void)
577 {
578 	int ret;
579 
580 	_thr_check_init();
581 	_pthread_spin_init(&__workqueue_list_lock, PTHREAD_PROCESS_PRIVATE);
582 	workqueue_list_lock();
583 	/* XXX - _pthread_attr_init(&_pthread_attr_default); */
584 	ret =_pthread_work_internal_init();
585 	workqueue_list_unlock();
586 
587 	return(ret);
588 }
589 
590 /*
591  * Pthread Workqueue Attributes.
592  */
593 int
_pthread_workqueue_attr_init_np(pthread_workqueue_attr_t * attrp)594 _pthread_workqueue_attr_init_np(pthread_workqueue_attr_t * attrp)
595 {
596 
597 	attrp->queueprio = WORKQ_DEFAULT_PRIOQUEUE;
598 	attrp->sig = PTHREAD_WORKQUEUE_ATTR_SIG;
599 	attrp->overcommit = 0;
600 	return (0);
601 }
602 
603 int
_pthread_workqueue_attr_destroy_np(pthread_workqueue_attr_t * attr)604 _pthread_workqueue_attr_destroy_np(pthread_workqueue_attr_t * attr)
605 {
606 
607 	if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG)
608 		return (0);
609 	else
610 		return (EINVAL); /* Not an attribute struct. */
611 }
612 
613 int
_pthread_workqueue_attr_getqueuepriority_np(const pthread_workqueue_attr_t * attr,int * qpriop)614 _pthread_workqueue_attr_getqueuepriority_np(
615     const pthread_workqueue_attr_t * attr, int * qpriop)
616 {
617 
618 	if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) {
619 		*qpriop = attr->queueprio;
620 		return (0);
621 	} else
622 		return (EINVAL); /* Not an atribute struct. */
623 }
624 
625 int
_pthread_workqueue_attr_setqueuepriority_np(pthread_workqueue_attr_t * attr,int qprio)626 _pthread_workqueue_attr_setqueuepriority_np(pthread_workqueue_attr_t * attr,
627     int qprio)
628 {
629 
630 	if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) {
631 		switch(qprio) {
632 		case WORKQ_HIGH_PRIOQUEUE:
633 		case WORKQ_DEFAULT_PRIOQUEUE:
634 		case WORKQ_LOW_PRIOQUEUE:
635 			attr->queueprio = qprio;
636 			return (0);
637 		default:
638 			return (EINVAL);
639 		}
640 	} else
641 		return (EINVAL);
642 }
643 
644 int
_pthread_workqueue_attr_getovercommit_np(const pthread_workqueue_attr_t * attr,int * ocommp)645 _pthread_workqueue_attr_getovercommit_np(const pthread_workqueue_attr_t * attr,
646     int * ocommp)
647 {
648 
649 	if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) {
650 		*ocommp = attr->overcommit;
651 		return (0);
652 	} else
653 		return (EINVAL); /* Not an attribute struct. */
654 }
655 
656 int
_pthread_workqueue_attr_setovercommit_np(pthread_workqueue_attr_t * attr,int ocomm)657 _pthread_workqueue_attr_setovercommit_np(pthread_workqueue_attr_t * attr,
658     int ocomm)
659 {
660 
661 	if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) {
662 		attr->overcommit = ocomm;
663 		return (0);
664 	} else
665 		return (EINVAL);
666 }
667 
668 
669 static int
valid_workq(pthread_workqueue_t workq)670 valid_workq(pthread_workqueue_t workq)
671 {
672 
673 	if (workq->sig == PTHREAD_WORKQUEUE_SIG)
674 		return (1);
675 	else
676 		return (0);
677 }
678 
679 int
_pthread_workqueue_getovercommit_np(pthread_workqueue_t workq,unsigned int * ocommp)680 _pthread_workqueue_getovercommit_np(pthread_workqueue_t workq,
681     unsigned int *ocommp)
682 {
683 
684 	if (valid_workq(workq) == 0)
685 		return (EINVAL);
686 
687 	if (ocommp != NULL)
688 		*ocommp = workq->overcommit;
689 
690 	return (0);
691 }
692 
693 /*
694  * Pthread Workqueue support routines.
695  */
696 int
_pthread_workqueue_create_np(pthread_workqueue_t * workqp,const pthread_workqueue_attr_t * attr)697 _pthread_workqueue_create_np(pthread_workqueue_t * workqp,
698     const pthread_workqueue_attr_t * attr)
699 {
700 	pthread_workqueue_t wq;
701 	pthread_workqueue_head_t headp;
702 	int error;
703 
704 	if ((attr != NULL) && (attr->sig != PTHREAD_WORKQUEUE_ATTR_SIG))
705 		return (EINVAL);
706 
707 	_thr_check_init();
708 
709 	workqueue_list_lock();
710 	if (kernel_workq_setup == 0) {
711 		error = _pthread_work_internal_init();
712 		if (error) {
713 			workqueue_list_unlock();
714 			return (error);
715 		}
716 	}
717 
718 	wq = alloc_workqueue();
719 	if (wq == NULL) {
720 		workqueue_list_unlock();
721 		return (ENOMEM);
722 	}
723 
724 	_pthread_workq_init(wq, attr);
725 
726 	headp = __pthread_wq_head_tbl[wq->queueprio];
727 	TAILQ_INSERT_TAIL(&headp->wqhead, wq, wq_list);
728 	if (headp->next_workq == NULL)
729 		headp->next_workq = TAILQ_FIRST(&headp->wqhead);
730 	workqueue_list_unlock();
731 
732 	*workqp = wq;
733 
734 	return (0);
735 }
736 
737 /*
738  * alloc_workitem() is called with the list lock held.  It will drop the lock
739  * if we need to actually alocate memory.
740  */
741 static pthread_workitem_t
alloc_workitem(void)742 alloc_workitem(void)
743 {
744 	pthread_workitem_t witem;
745 
746 	if (TAILQ_EMPTY(&__pthread_workitem_pool_head)) {
747 		workqueue_list_unlock();
748 		witem = malloc(sizeof(struct _pthread_workitem));
749 		if (witem == NULL)
750 			return (NULL);
751 		witem->gencount = 0;
752 		workqueue_list_lock();
753 	} else {
754 		witem = TAILQ_FIRST(&__pthread_workitem_pool_head);
755 		TAILQ_REMOVE(&__pthread_workitem_pool_head, witem, item_entry);
756 	}
757 	return (witem);
758 }
759 
760 /*
761  * free_workitem() is called with the list lock held.
762  */
763 static void
free_workitem(pthread_workitem_t witem)764 free_workitem(pthread_workitem_t witem)
765 {
766 
767 	witem->gencount++;
768 	TAILQ_INSERT_TAIL(&__pthread_workitem_pool_head, witem, item_entry);
769 }
770 
771 /*
772  * alloc_workqueue() is called with list lock held.
773  */
774 static pthread_workqueue_t
alloc_workqueue(void)775 alloc_workqueue(void)
776 {
777 	pthread_workqueue_t wq;
778 
779 	if (TAILQ_EMPTY(&__pthread_workqueue_pool_head)) {
780 		 workqueue_list_unlock();
781 		 wq = malloc(sizeof(struct _pthread_workqueue));
782 		 if (wq == NULL)
783 			 return (NULL);
784 		 workqueue_list_lock();
785 	} else {
786 		wq = TAILQ_FIRST(&__pthread_workqueue_pool_head);
787 		TAILQ_REMOVE(&__pthread_workqueue_pool_head, wq, wq_list);
788 	}
789 	user_workq_count++;
790 	return(wq);
791 }
792 
793 /*
794  * free_workqueue() is called with list lock held.
795  */
796 static void
free_workqueue(pthread_workqueue_t wq)797 free_workqueue(pthread_workqueue_t wq)
798 {
799 
800 	user_workq_count--;
801 	TAILQ_INSERT_TAIL(&__pthread_workqueue_pool_head, wq, wq_list);
802 }
803 
804 static int
post_nextworkitem(pthread_workqueue_t workq)805 post_nextworkitem(pthread_workqueue_t workq)
806 {
807 	int error, prio;
808 	pthread_workitem_t witem;
809 	pthread_workqueue_head_t headp;
810 	void (*func)(pthread_workqueue_t, void *);
811 
812 	if ((workq->flags & PTHREAD_WORKQ_SUSPEND) == PTHREAD_WORKQ_SUSPEND)
813 		return (0);
814 
815 	if (TAILQ_EMPTY(&workq->item_listhead))
816 		return (0);
817 
818 	if ((workq->flags & PTHREAD_WORKQ_BARRIER_ON) ==
819 	    PTHREAD_WORKQ_BARRIER_ON)
820 		return (0);
821 
822 	witem = TAILQ_FIRST(&workq->item_listhead);
823 	headp = workq->headp;
824 
825 	if ((witem->flags & PTH_WQITEM_BARRIER) == PTH_WQITEM_BARRIER) {
826 		if ((witem->flags & PTH_WQITEM_APPLIED) != 0)
827 			return (0);
828 
829 		/*
830 		 * Also barrier when nothing needs to be handled and
831 		 * nothing to wait for.
832 		 */
833 		if (workq->kq_count != 0) {
834 			witem->flags |= PTH_WQITEM_APPLIED;
835 			workq->flags |= PTHREAD_WORKQ_BARRIER_ON;
836 			workq->barrier_count = workq->kq_count;
837 
838 			return (1);
839 		} else {
840 			if (witem->func != NULL) {
841 				/* We are going to drop list lock. */
842 				witem->flags |= PTH_WQITEM_APPLIED;
843 				workq->flags |= PTHREAD_WORKQ_BARRIER_ON;
844 				workqueue_list_unlock();
845 
846 				func = (void (*)(pthread_workqueue_t, void *))
847 				    witem->func;
848 				(*func)(workq, witem->func_arg);
849 
850 				workqueue_list_lock();
851 				workq->flags &= ~PTHREAD_WORKQ_BARRIER_ON;
852 			}
853 			TAILQ_REMOVE(&workq->item_listhead, witem, item_entry);
854 			witem->flags = 0;
855 			free_workitem(witem);
856 
857 			return (1);
858 		}
859 	} else if ((witem->flags & PTH_WQITEM_DESTROY) == PTH_WQITEM_DESTROY) {
860 		if ((witem->flags & PTH_WQITEM_APPLIED) != 0)
861 			return (0);
862 		witem->flags |= PTH_WQITEM_APPLIED;
863 		workq->flags |=
864 		    (PTHREAD_WORKQ_BARRIER_ON | PTHREAD_WORKQ_TERM_ON);
865 		workq->barrier_count = workq->kq_count;
866 		workq->term_callback =
867 		    (void (*)(struct _pthread_workqueue *,void *))witem->func;
868 		workq->term_callarg = witem->func_arg;
869 		TAILQ_REMOVE(&workq->item_listhead, witem, item_entry);
870 
871 		if ((TAILQ_EMPTY(&workq->item_listhead)) &&
872 		    (workq->kq_count == 0)) {
873 			witem->flags = 0;
874 			free_workitem(witem);
875 			workq->flags |= PTHREAD_WORKQ_DESTROYED;
876 
877 			headp = __pthread_wq_head_tbl[workq->queueprio];
878 			if (headp->next_workq == workq) {
879 				headp->next_workq = TAILQ_NEXT(workq, wq_list);
880 				if (headp->next_workq == NULL) {
881 					headp->next_workq =
882 					    TAILQ_FIRST(&headp->wqhead);
883 					if (headp->next_workq == workq)
884 						headp->next_workq = NULL;
885 				}
886 			}
887 			workq->sig = 0;
888 			TAILQ_REMOVE(&headp->wqhead, workq, wq_list);
889 			if (workq->term_callback != NULL) {
890 				workqueue_list_unlock();
891 				(*workq->term_callback)(workq,
892 				    workq->term_callarg);
893 				workqueue_list_lock();
894 			}
895 			free_workqueue(workq);
896 			return (1);
897 		} else
898 			TAILQ_INSERT_HEAD(&workq->item_listhead, witem,
899 			    item_entry);
900 
901 		return (1);
902 
903 	} else {
904 		 TAILQ_REMOVE(&workq->item_listhead, witem, item_entry);
905 		 if ((witem->flags & PTH_WQITEM_KERN_COUNT) == 0) {
906 			 workq->kq_count++;
907 			 witem->flags |= PTH_WQITEM_KERN_COUNT;
908 		 }
909 		 ATOMIC_INC(&kernel_workq_count);
910 
911 		 workqueue_list_unlock();
912 
913 		 prio = workq->queueprio;
914 		 if (workq->overcommit != 0)
915 			 prio |= WORKQUEUE_OVERCOMMIT;
916 
917 		 if ((error = thr_workq_queue_add(witem,
918 			     workq->affinity, prio)) == -1) {
919 			 ATOMIC_DEC(&kernel_workq_count);
920 
921 			 workqueue_list_lock();
922 			 TAILQ_REMOVE(&workq->item_kernhead, witem, item_entry);
923 			 TAILQ_INSERT_HEAD(&workq->item_listhead, witem,
924 			     item_entry);
925 			 if ((workq->flags & (PTHREAD_WORKQ_BARRIER_ON |
926 				     PTHREAD_WORKQ_TERM_ON)) != 0)
927 				 workq->flags |= PTHREAD_WORKQ_REQUEUED;
928 		 } else
929 		 	workqueue_list_lock();
930 
931 		 return (1);
932 	}
933 
934 	/* NOT REACHED. */
935 
936 	PANIC("Error in logic for post_nextworkitem()");
937 
938 	return (0);
939 }
940 
941 /*
942  * pick_nextworkqueue_droplock() is called with the list lock held and
943  * drops the lock.
944  */
945 static void
pick_nextworkqueue_droplock(void)946 pick_nextworkqueue_droplock(void)
947 {
948 	int i, curwqprio, val, found;
949 	pthread_workqueue_head_t headp;
950 	pthread_workqueue_t workq;
951 	pthread_workqueue_t nworkq = NULL;
952 
953 
954 loop:
955 	while (kernel_workq_count < WORKQ_OS_ELEM_MAX) {
956 		found = 0;
957 		for (i = 0; i < WORKQ_OS_NUMPRIOS; i++)  {
958 			/* There is nothing else higher to run. */
959 			wqreadyprio = i;
960 			headp = __pthread_wq_head_tbl[i];
961 
962 			if (TAILQ_EMPTY(&headp->wqhead))
963 				continue;
964 			workq = headp->next_workq;
965 			if (workq == NULL)
966 				workq = TAILQ_FIRST(&headp->wqhead);
967 			curwqprio = workq->queueprio;
968 			nworkq = workq;
969 			while (kernel_workq_count < WORKQ_OS_ELEM_MAX) {
970 				headp->next_workq = TAILQ_NEXT(workq, wq_list);
971 				if (headp->next_workq == NULL)
972 					headp->next_workq =
973 					    TAILQ_FIRST(&headp->wqhead);
974 
975 				val = post_nextworkitem(workq);
976 				if (val != 0) {
977 					/*
978 					 * Things could have change so let's
979 					 * reassess.  If kernel queu is full
980 					 * then skip.
981 					 */
982 					if (kernel_workq_count >=
983 					    WORKQ_OS_ELEM_MAX)
984 						break;
985 					/*
986 					 * If anything with higher prio arrived
987 					 * then reevaluate.
988 					 */
989 					if (wqreadyprio < curwqprio)
990 						goto loop; /* re-evaluate */
991 					/*
992 					 * We can post some more work items.
993 					 */
994 					found = 1;
995 				}
996 
997 				/*
998 				 * We cannot use workq here as it could be
999 				 * freed.
1000 				 */
1001 				if (TAILQ_EMPTY(&headp->wqhead))
1002 					break;
1003 				/*
1004 				 * If we found nothing to run and only one
1005 				 * workqueue in the list, skip.
1006 				 */
1007 				if ((val == 0) && (workq == headp->next_workq))
1008 					break;
1009 				workq = headp->next_workq;
1010 				if (workq == NULL)
1011 					workq = TAILQ_FIRST(&headp->wqhead);
1012 				if (val != 0)
1013 					nworkq = workq;
1014 				/*
1015 				 * If we found nothing to run then back to workq
1016 				 * where we started.
1017 				 */
1018 				if ((val == 0) && (workq == nworkq))
1019 					break;
1020 			}
1021 			if (kernel_workq_count >= WORKQ_OS_ELEM_MAX)
1022 				break;
1023 		}
1024 		/* Nothing found to run? */
1025 		if (found == 0)
1026 			break;
1027 
1028 	}
1029 	workqueue_list_unlock();
1030 }
1031 
1032 
1033 int
_pthread_workqueue_additem_np(pthread_workqueue_t workq,void * (* workitem_func)(void *),void * workitem_arg,pthread_workitem_handle_t * itemhandlep,unsigned int * gencountp)1034 _pthread_workqueue_additem_np(pthread_workqueue_t workq,
1035     void *( *workitem_func)(void *), void * workitem_arg,
1036     pthread_workitem_handle_t * itemhandlep, unsigned int *gencountp)
1037 {
1038 	pthread_workitem_t witem;
1039 
1040 	if (valid_workq(workq) == 0)
1041 		return (EINVAL);
1042 
1043 	workqueue_list_lock();
1044 	/*
1045 	 * Allocate the workitem here as it can drop the lock.  Also we can
1046 	 * evaluate the workqueue state only once.
1047 	 */
1048 	witem = alloc_workitem();
1049 	if (witem == NULL) {
1050 		workqueue_list_unlock();
1051 		return (ENOMEM);
1052 	}
1053 	witem->func = workitem_func;
1054 	witem->func_arg = workitem_arg;
1055 	witem->flags = 0;
1056 	witem->workq = workq;
1057 	witem->item_entry.tqe_next = 0;
1058 	witem->item_entry.tqe_prev = 0;
1059 
1060 	/* alloc_workitem() can drop the lock, check the state. */
1061 	if ((workq->flags &
1062 		(PTHREAD_WORKQ_IN_TERMINATE | PTHREAD_WORKQ_DESTROYED)) != 0) {
1063 		free_workitem(witem);
1064 		workqueue_list_unlock();
1065 		*itemhandlep = 0;
1066 		return (ESRCH);
1067 	}
1068 
1069 	if (itemhandlep != NULL)
1070 		*itemhandlep = (pthread_workitem_handle_t *)witem;
1071 	if (gencountp != NULL)
1072 		*gencountp = witem->gencount;
1073 	TAILQ_INSERT_TAIL(&workq->item_listhead, witem, item_entry);
1074 	if (((workq->flags & PTHREAD_WORKQ_BARRIER_ON) == 0) &&
1075 	    (workq->queueprio < wqreadyprio))
1076 		wqreadyprio = workq->queueprio;
1077 
1078 	pick_nextworkqueue_droplock();
1079 
1080 	return (0);
1081 }
1082 
1083 /*
1084  * Pthread Workqueue support routines.
1085  */
1086 int
_pthread_workqueue_requestconcurrency_np(int queue,int request_concurrency)1087 _pthread_workqueue_requestconcurrency_np(int queue, int request_concurrency)
1088 {
1089 	int error = 0;
1090 
1091 	if (queue < 0 || queue > WORKQ_OS_NUMPRIOS)
1092 		return (EINVAL);
1093 
1094 	error = thr_workq_thread_setconc(queue, request_concurrency);
1095 
1096 	if (error == -1)
1097 		return (errno);
1098 
1099 	return (0);
1100 }
1101 
1102 void
_pthread_workqueue_atfork_prepare(void)1103 _pthread_workqueue_atfork_prepare(void)
1104 {
1105 
1106 	if (dispatch_atfork_prepare != 0)
1107 		dispatch_atfork_prepare();
1108 }
1109 
1110 void
_pthread_workqueue_atfork_parent(void)1111 _pthread_workqueue_atfork_parent(void)
1112 {
1113 
1114 	if (dispatch_atfork_parent != 0)
1115 		dispatch_atfork_parent();
1116 }
1117 
1118 void
_pthread_workqueue_atfork_child(void)1119 _pthread_workqueue_atfork_child(void)
1120 {
1121 
1122 	(void)_pthread_spin_destroy(&__workqueue_list_lock);
1123 	(void)_pthread_spin_init(&__workqueue_list_lock,
1124 	    PTHREAD_PROCESS_PRIVATE);
1125 	if (kernel_workq_setup != 0) {
1126 		kernel_workq_setup = 0;
1127 		_pthread_work_internal_init();
1128 	}
1129 	if (dispatch_atfork_child != 0)
1130 		dispatch_atfork_child();
1131 }
1132 
1133