1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 
22 /*
23  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 
27 #include <sys/cdefs.h>
28 /* __FBSDID("$FreeBSD: head/cddl/compat/opensolaris/misc/thread_pool.c 275595 2014-12-08 06:10:47Z delphij $"); */
29 
30 #include <stdlib.h>
31 #include <signal.h>
32 #include <errno.h>
33 #include "thread_pool_impl.h"
34 
35 typedef void (*_Voidfp)(void*); /* pointer to extern "C" function */
36 
37 static void
delete_pool(tpool_t * tpool)38 delete_pool(tpool_t *tpool)
39 {
40           tpool_job_t *job;
41 
42           /*
43            * There should be no pending jobs, but just in case...
44            */
45           for (job = tpool->tp_head; job != NULL; job = tpool->tp_head) {
46                     tpool->tp_head = job->tpj_next;
47                     free(job);
48           }
49           (void) pthread_attr_destroy(&tpool->tp_attr);
50           free(tpool);
51 }
52 
53 /*
54  * Worker thread is terminating.
55  */
56 static void
worker_cleanup(void * arg)57 worker_cleanup(void *arg)
58 {
59           tpool_t *tpool = arg;
60 
61           if (--tpool->tp_current == 0 &&
62               (tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
63                     if (tpool->tp_flags & TP_ABANDON) {
64                               pthread_mutex_unlock(&tpool->tp_mutex);
65                               delete_pool(tpool);
66                               return;
67                     }
68                     if (tpool->tp_flags & TP_DESTROY)
69                               (void) pthread_cond_broadcast(&tpool->tp_busycv);
70           }
71           pthread_mutex_unlock(&tpool->tp_mutex);
72 }
73 
74 static void
notify_waiters(tpool_t * tpool)75 notify_waiters(tpool_t *tpool)
76 {
77           if (tpool->tp_head == NULL && tpool->tp_active == NULL) {
78                     tpool->tp_flags &= ~TP_WAIT;
79                     (void) pthread_cond_broadcast(&tpool->tp_waitcv);
80           }
81 }
82 
83 /*
84  * Called by a worker thread on return from a tpool_dispatch()d job.
85  */
86 static void
job_cleanup(void * arg)87 job_cleanup(void *arg)
88 {
89           tpool_t *tpool = arg;
90           pthread_t my_tid = pthread_self();
91           tpool_active_t *activep;
92           tpool_active_t **activepp;
93 
94           pthread_mutex_lock(&tpool->tp_mutex);
95           /* CSTYLED */
96           for (activepp = &tpool->tp_active;; activepp = &activep->tpa_next) {
97                     activep = *activepp;
98                     if (activep->tpa_tid == my_tid) {
99                               *activepp = activep->tpa_next;
100                               break;
101                     }
102           }
103           if (tpool->tp_flags & TP_WAIT)
104                     notify_waiters(tpool);
105 }
106 
107 static void *
tpool_worker(void * arg)108 tpool_worker(void *arg)
109 {
110           tpool_t *tpool = (tpool_t *)arg;
111           int elapsed;
112           tpool_job_t *job;
113           void (*func)(void *);
114           tpool_active_t active;
115           sigset_t maskset;
116 
117           pthread_mutex_lock(&tpool->tp_mutex);
118           pthread_cleanup_push(worker_cleanup, tpool);
119 
120           /*
121            * This is the worker's main loop.
122            * It will only be left if a timeout or an error has occured.
123            */
124           active.tpa_tid = pthread_self();
125           for (;;) {
126                     elapsed = 0;
127                     tpool->tp_idle++;
128                     if (tpool->tp_flags & TP_WAIT)
129                               notify_waiters(tpool);
130                     while ((tpool->tp_head == NULL ||
131                         (tpool->tp_flags & TP_SUSPEND)) &&
132                         !(tpool->tp_flags & (TP_DESTROY | TP_ABANDON))) {
133                               if (tpool->tp_current <= tpool->tp_minimum ||
134                                   tpool->tp_linger == 0) {
135                                         (void) pthread_cond_wait(&tpool->tp_workcv,
136                                             &tpool->tp_mutex);
137                               } else {
138                                         struct timespec timeout;
139 
140                                         clock_gettime(CLOCK_MONOTONIC, &timeout);
141                                         timeout.tv_sec += tpool->tp_linger;
142                                         if (pthread_cond_timedwait(&tpool->tp_workcv,
143                                             &tpool->tp_mutex, &timeout) != 0) {
144                                                   elapsed = 1;
145                                                   break;
146                                         }
147                               }
148                     }
149                     tpool->tp_idle--;
150                     if (tpool->tp_flags & TP_DESTROY)
151                               break;
152                     if (tpool->tp_flags & TP_ABANDON) {
153                               /* can't abandon a suspended pool */
154                               if (tpool->tp_flags & TP_SUSPEND) {
155                                         tpool->tp_flags &= ~TP_SUSPEND;
156                                         (void) pthread_cond_broadcast(&tpool->tp_workcv);
157                               }
158                               if (tpool->tp_head == NULL)
159                                         break;
160                     }
161                     if ((job = tpool->tp_head) != NULL &&
162                         !(tpool->tp_flags & TP_SUSPEND)) {
163                               elapsed = 0;
164                               func = job->tpj_func;
165                               arg = job->tpj_arg;
166                               tpool->tp_head = job->tpj_next;
167                               if (job == tpool->tp_tail)
168                                         tpool->tp_tail = NULL;
169                               tpool->tp_njobs--;
170                               active.tpa_next = tpool->tp_active;
171                               tpool->tp_active = &active;
172                               pthread_mutex_unlock(&tpool->tp_mutex);
173                               pthread_cleanup_push(job_cleanup, tpool);
174                               free(job);
175                               /*
176                                * Call the specified function.
177                                */
178                               func(arg);
179                               /*
180                                * We don't know what this thread has been doing,
181                                * so we reset its signal mask and cancellation
182                                * state back to the initial values.
183                                */
184                               sigfillset(&maskset);
185                               (void) pthread_sigmask(SIG_SETMASK, &maskset, NULL);
186                               (void) pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,
187                                   NULL);
188                               (void) pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,
189                                   NULL);
190                               pthread_cleanup_pop(1);
191                     }
192                     if (elapsed && tpool->tp_current > tpool->tp_minimum) {
193                               /*
194                                * We timed out and there is no work to be done
195                                * and the number of workers exceeds the minimum.
196                                * Exit now to reduce the size of the pool.
197                                */
198                               break;
199                     }
200           }
201           pthread_cleanup_pop(1);
202           return (arg);
203 }
204 
205 /*
206  * Create a worker thread, with all signals blocked.
207  */
208 static int
create_worker(tpool_t * tpool)209 create_worker(tpool_t *tpool)
210 {
211           sigset_t maskset, oset;
212           pthread_t thread;
213           int error;
214 
215           sigfillset(&maskset);
216           (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
217           error = pthread_create(&thread, &tpool->tp_attr, tpool_worker, tpool);
218           (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
219           return (error);
220 }
221 
222 tpool_t   *
tpool_create(uint_t min_threads,uint_t max_threads,uint_t linger,pthread_attr_t * attr)223 tpool_create(uint_t min_threads, uint_t max_threads, uint_t linger,
224           pthread_attr_t *attr)
225 {
226           tpool_t   *tpool;
227           int error;
228 
229           if (min_threads > max_threads || max_threads < 1) {
230                     errno = EINVAL;
231                     return (NULL);
232           }
233 
234           tpool = calloc(1, sizeof (*tpool));
235           if (tpool == NULL) {
236                     errno = ENOMEM;
237                     return (NULL);
238           }
239           (void) pthread_mutex_init(&tpool->tp_mutex, NULL);
240           (void) pthread_cond_init(&tpool->tp_busycv, NULL);
241           (void) pthread_cond_init(&tpool->tp_workcv, NULL);
242           (void) pthread_cond_init(&tpool->tp_waitcv, NULL);
243           tpool->tp_minimum = min_threads;
244           tpool->tp_maximum = max_threads;
245           tpool->tp_linger = linger;
246 
247           /* make all pool threads be detached daemon threads */
248           (void) pthread_attr_init(&tpool->tp_attr);
249           (void) pthread_attr_setdetachstate(&tpool->tp_attr,
250               PTHREAD_CREATE_DETACHED);
251 
252           return (tpool);
253 }
254 
255 /*
256  * Dispatch a work request to the thread pool.
257  * If there are idle workers, awaken one.
258  * Else, if the maximum number of workers has
259  * not been reached, spawn a new worker thread.
260  * Else just return with the job added to the queue.
261  */
262 int
tpool_dispatch(tpool_t * tpool,void (* func)(void *),void * arg)263 tpool_dispatch(tpool_t *tpool, void (*func)(void *), void *arg)
264 {
265           tpool_job_t *job;
266 
267           if ((job = calloc(1, sizeof (*job))) == NULL)
268                     return (-1);
269           job->tpj_next = NULL;
270           job->tpj_func = func;
271           job->tpj_arg = arg;
272 
273           pthread_mutex_lock(&tpool->tp_mutex);
274 
275           if (tpool->tp_head == NULL)
276                     tpool->tp_head = job;
277           else
278                     tpool->tp_tail->tpj_next = job;
279           tpool->tp_tail = job;
280           tpool->tp_njobs++;
281 
282           if (!(tpool->tp_flags & TP_SUSPEND)) {
283                     if (tpool->tp_idle > 0)
284                               (void) pthread_cond_signal(&tpool->tp_workcv);
285                     else if (tpool->tp_current < tpool->tp_maximum &&
286                         create_worker(tpool) == 0)
287                               tpool->tp_current++;
288           }
289 
290           pthread_mutex_unlock(&tpool->tp_mutex);
291           return (0);
292 }
293 
294 /*
295  * Assumes: by the time tpool_destroy() is called no one will use this
296  * thread pool in any way and no one will try to dispatch entries to it.
297  * Calling tpool_destroy() from a job in the pool will cause deadlock.
298  */
299 void
tpool_destroy(tpool_t * tpool)300 tpool_destroy(tpool_t *tpool)
301 {
302           tpool_active_t *activep;
303 
304           pthread_mutex_lock(&tpool->tp_mutex);
305           pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
306 
307           /* mark the pool as being destroyed; wakeup idle workers */
308           tpool->tp_flags |= TP_DESTROY;
309           tpool->tp_flags &= ~TP_SUSPEND;
310           (void) pthread_cond_broadcast(&tpool->tp_workcv);
311 
312           /* cancel all active workers */
313           for (activep = tpool->tp_active; activep; activep = activep->tpa_next)
314                     (void) pthread_cancel(activep->tpa_tid);
315 
316           /* wait for all active workers to finish */
317           while (tpool->tp_active != NULL) {
318                     tpool->tp_flags |= TP_WAIT;
319                     (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
320           }
321 
322           /* the last worker to terminate will wake us up */
323           while (tpool->tp_current != 0)
324                     (void) pthread_cond_wait(&tpool->tp_busycv, &tpool->tp_mutex);
325 
326           pthread_cleanup_pop(1);       /* pthread_mutex_unlock(&tpool->tp_mutex); */
327           delete_pool(tpool);
328 }
329 
330 /*
331  * Like tpool_destroy(), but don't cancel workers or wait for them to finish.
332  * The last worker to terminate will delete the pool.
333  */
334 void
tpool_abandon(tpool_t * tpool)335 tpool_abandon(tpool_t *tpool)
336 {
337 
338           pthread_mutex_lock(&tpool->tp_mutex);
339           if (tpool->tp_current == 0) {
340                     /* no workers, just delete the pool */
341                     pthread_mutex_unlock(&tpool->tp_mutex);
342                     delete_pool(tpool);
343           } else {
344                     /* wake up all workers, last one will delete the pool */
345                     tpool->tp_flags |= TP_ABANDON;
346                     tpool->tp_flags &= ~TP_SUSPEND;
347                     (void) pthread_cond_broadcast(&tpool->tp_workcv);
348                     pthread_mutex_unlock(&tpool->tp_mutex);
349           }
350 }
351 
352 /*
353  * Wait for all jobs to complete.
354  * Calling tpool_wait() from a job in the pool will cause deadlock.
355  */
356 void
tpool_wait(tpool_t * tpool)357 tpool_wait(tpool_t *tpool)
358 {
359 
360           pthread_mutex_lock(&tpool->tp_mutex);
361           pthread_cleanup_push((_Voidfp)pthread_mutex_unlock, &tpool->tp_mutex);
362           while (tpool->tp_head != NULL || tpool->tp_active != NULL) {
363                     tpool->tp_flags |= TP_WAIT;
364                     (void) pthread_cond_wait(&tpool->tp_waitcv, &tpool->tp_mutex);
365           }
366           pthread_cleanup_pop(1);       /* pthread_mutex_unlock(&tpool->tp_mutex); */
367 }
368 
369 void
tpool_suspend(tpool_t * tpool)370 tpool_suspend(tpool_t *tpool)
371 {
372 
373           pthread_mutex_lock(&tpool->tp_mutex);
374           tpool->tp_flags |= TP_SUSPEND;
375           pthread_mutex_unlock(&tpool->tp_mutex);
376 }
377 
378 int
tpool_suspended(tpool_t * tpool)379 tpool_suspended(tpool_t *tpool)
380 {
381           int suspended;
382 
383           pthread_mutex_lock(&tpool->tp_mutex);
384           suspended = (tpool->tp_flags & TP_SUSPEND) != 0;
385           pthread_mutex_unlock(&tpool->tp_mutex);
386 
387           return (suspended);
388 }
389 
390 void
tpool_resume(tpool_t * tpool)391 tpool_resume(tpool_t *tpool)
392 {
393           int excess;
394 
395           pthread_mutex_lock(&tpool->tp_mutex);
396           if (!(tpool->tp_flags & TP_SUSPEND)) {
397                     pthread_mutex_unlock(&tpool->tp_mutex);
398                     return;
399           }
400           tpool->tp_flags &= ~TP_SUSPEND;
401           (void) pthread_cond_broadcast(&tpool->tp_workcv);
402           excess = tpool->tp_njobs - tpool->tp_idle;
403           while (excess-- > 0 && tpool->tp_current < tpool->tp_maximum) {
404                     if (create_worker(tpool) != 0)
405                               break;              /* pthread_create() failed */
406                     tpool->tp_current++;
407           }
408           pthread_mutex_unlock(&tpool->tp_mutex);
409 }
410 
411 int
tpool_member(tpool_t * tpool)412 tpool_member(tpool_t *tpool)
413 {
414           pthread_t my_tid = pthread_self();
415           tpool_active_t *activep;
416 
417           pthread_mutex_lock(&tpool->tp_mutex);
418           for (activep = tpool->tp_active; activep; activep = activep->tpa_next) {
419                     if (activep->tpa_tid == my_tid) {
420                               pthread_mutex_unlock(&tpool->tp_mutex);
421                               return (1);
422                     }
423           }
424           pthread_mutex_unlock(&tpool->tp_mutex);
425           return (0);
426 }
427