1 /*        $NetBSD: tpool.c,v 1.2 2021/08/14 16:14:56 christos Exp $   */
2 
3 /* $OpenLDAP$ */
4 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
5  *
6  * Copyright 1998-2021 The OpenLDAP Foundation.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted only as authorized by the OpenLDAP
11  * Public License.
12  *
13  * A copy of this license is available in file LICENSE in the
14  * top-level directory of the distribution or, alternatively, at
15  * <http://www.OpenLDAP.org/license.html>.
16  */
17 
18 #include <sys/cdefs.h>
19 __RCSID("$NetBSD: tpool.c,v 1.2 2021/08/14 16:14:56 christos Exp $");
20 
21 #include "portable.h"
22 
23 #include <stdio.h>
24 
25 #include <ac/signal.h>
26 #include <ac/stdarg.h>
27 #include <ac/stdlib.h>
28 #include <ac/string.h>
29 #include <ac/time.h>
30 #include <ac/errno.h>
31 
32 #include "ldap-int.h"
33 
34 #ifdef LDAP_R_COMPILE
35 
36 #include "ldap_pvt_thread.h" /* Get the thread interface */
37 #include "ldap_queue.h"
38 #define LDAP_THREAD_POOL_IMPLEMENTATION
39 #include "ldap_thr_debug.h"  /* May rename symbols defined below */
40 
41 #ifndef LDAP_THREAD_HAVE_TPOOL
42 
43 #ifndef CACHELINE
44 #define CACHELINE   64
45 #endif
46 
47 /* Thread-specific key with data and optional free function */
48 typedef struct ldap_int_tpool_key_s {
49           void *ltk_key;
50           void *ltk_data;
51           ldap_pvt_thread_pool_keyfree_t *ltk_free;
52 } ldap_int_tpool_key_t;
53 
54 /* Max number of thread-specific keys we store per thread.
55  * We don't expect to use many...
56  */
57 #define   MAXKEYS   32
58 
59 /* Max number of threads */
60 #define   LDAP_MAXTHR         1024      /* must be a power of 2 */
61 
62 /* (Theoretical) max number of pending requests */
63 #define MAX_PENDING (INT_MAX/2)         /* INT_MAX - (room to avoid overflow) */
64 
65 /* pool->ltp_pause values */
66 enum { NOT_PAUSED = 0, WANT_PAUSE = 1, PAUSED = 2 };
67 
68 /* Context: thread ID and thread-specific key/data pairs */
69 typedef struct ldap_int_thread_userctx_s {
70           struct ldap_int_thread_poolq_s *ltu_pq;
71           ldap_pvt_thread_t ltu_id;
72           ldap_int_tpool_key_t ltu_key[MAXKEYS];
73 } ldap_int_thread_userctx_t;
74 
75 
76 /* Simple {thread ID -> context} hash table; key=ctx->ltu_id.
77  * Protected by ldap_pvt_thread_pool_mutex.
78  */
79 static struct {
80           ldap_int_thread_userctx_t *ctx;
81           /* ctx is valid when not NULL or DELETED_THREAD_CTX */
82 #         define DELETED_THREAD_CTX (&ldap_int_main_thrctx + 1) /* dummy addr */
83 } thread_keys[LDAP_MAXTHR];
84 
85 #define   TID_HASH(tid, hash) do { \
86           unsigned const char *ptr_ = (unsigned const char *)&(tid); \
87           unsigned i_; \
88           for (i_ = 0, (hash) = ptr_[0]; ++i_ < sizeof(tid);) \
89                     (hash) += ((hash) << 5) ^ ptr_[i_]; \
90 } while(0)
91 
92 
93 /* Task for a thread to perform */
94 typedef struct ldap_int_thread_task_s {
95           union {
96                     LDAP_STAILQ_ENTRY(ldap_int_thread_task_s) q;
97                     LDAP_SLIST_ENTRY(ldap_int_thread_task_s) l;
98           } ltt_next;
99           ldap_pvt_thread_start_t *ltt_start_routine;
100           void *ltt_arg;
101           struct ldap_int_thread_poolq_s *ltt_queue;
102 } ldap_int_thread_task_t;
103 
104 typedef LDAP_STAILQ_HEAD(tcq, ldap_int_thread_task_s) ldap_int_tpool_plist_t;
105 
106 struct ldap_int_thread_poolq_s {
107           void *ltp_free;
108 
109           struct ldap_int_thread_pool_s *ltp_pool;
110 
111           /* protect members below */
112           ldap_pvt_thread_mutex_t ltp_mutex;
113 
114           /* not paused and something to do for pool_<wrapper/pause/destroy>()
115            * Used for normal pool operation, to synch between submitter and
116            * worker threads. Not used for pauses. In normal operation multiple
117            * queues can rendezvous without acquiring the main pool lock.
118            */
119           ldap_pvt_thread_cond_t ltp_cond;
120 
121           /* ltp_pause == 0 ? &ltp_pending_list : &empty_pending_list,
122            * maintained to reduce work for pool_wrapper()
123            */
124           ldap_int_tpool_plist_t *ltp_work_list;
125 
126           /* pending tasks, and unused task objects */
127           ldap_int_tpool_plist_t ltp_pending_list;
128           LDAP_SLIST_HEAD(tcl, ldap_int_thread_task_s) ltp_free_list;
129 
130           /* Max number of threads in this queue */
131           int ltp_max_count;
132 
133           /* Max pending + paused + idle tasks, negated when ltp_finishing */
134           int ltp_max_pending;
135 
136           int ltp_pending_count;                  /* Pending + paused + idle tasks */
137           int ltp_active_count;                   /* Active, not paused/idle tasks */
138           int ltp_open_count;                     /* Number of threads */
139           int ltp_starting;                       /* Currently starting threads */
140 };
141 
142 struct ldap_int_thread_pool_s {
143           LDAP_STAILQ_ENTRY(ldap_int_thread_pool_s) ltp_next;
144 
145           struct ldap_int_thread_poolq_s **ltp_wqs;
146 
147           /* number of poolqs */
148           int ltp_numqs;
149 
150           /* protect members below */
151           ldap_pvt_thread_mutex_t ltp_mutex;
152 
153           /* paused and waiting for resume
154            * When a pause is in effect all workers switch to waiting on
155            * this cond instead of their per-queue cond.
156            */
157           ldap_pvt_thread_cond_t ltp_cond;
158 
159           /* ltp_active_queues < 1 && ltp_pause */
160           ldap_pvt_thread_cond_t ltp_pcond;
161 
162           /* number of active queues */
163           int ltp_active_queues;
164 
165           /* The pool is finishing, waiting for its threads to close.
166            * They close when ltp_pending_list is done.  pool_submit()
167            * rejects new tasks.  ltp_max_pending = -(its old value).
168            */
169           int ltp_finishing;
170 
171           /* Some active task needs to be the sole active task.
172            * Atomic variable so ldap_pvt_thread_pool_pausing() can read it.
173            */
174           volatile sig_atomic_t ltp_pause;
175 
176           /* Max number of threads in pool */
177           int ltp_max_count;
178 
179           /* Configured max number of threads in pool, 0 for default (LDAP_MAXTHR) */
180           int ltp_conf_max_count;
181 
182           /* Max pending + paused + idle tasks, negated when ltp_finishing */
183           int ltp_max_pending;
184 };
185 
186 static ldap_int_tpool_plist_t empty_pending_list =
187           LDAP_STAILQ_HEAD_INITIALIZER(empty_pending_list);
188 
189 static int ldap_int_has_thread_pool = 0;
190 static LDAP_STAILQ_HEAD(tpq, ldap_int_thread_pool_s)
191           ldap_int_thread_pool_list =
192           LDAP_STAILQ_HEAD_INITIALIZER(ldap_int_thread_pool_list);
193 
194 static ldap_pvt_thread_mutex_t ldap_pvt_thread_pool_mutex;
195 
196 static void *ldap_int_thread_pool_wrapper( void *pool );
197 
198 static ldap_pvt_thread_key_t  ldap_tpool_key;
199 
200 /* Context of the main thread */
201 static ldap_int_thread_userctx_t ldap_int_main_thrctx;
202 
203 int
ldap_int_thread_pool_startup(void)204 ldap_int_thread_pool_startup ( void )
205 {
206           ldap_int_main_thrctx.ltu_id = ldap_pvt_thread_self();
207           ldap_pvt_thread_key_create( &ldap_tpool_key );
208           return ldap_pvt_thread_mutex_init(&ldap_pvt_thread_pool_mutex);
209 }
210 
211 int
ldap_int_thread_pool_shutdown(void)212 ldap_int_thread_pool_shutdown ( void )
213 {
214           struct ldap_int_thread_pool_s *pool;
215 
216           while ((pool = LDAP_STAILQ_FIRST(&ldap_int_thread_pool_list)) != NULL) {
217                     (ldap_pvt_thread_pool_destroy)(&pool, 0); /* ignore thr_debug macro */
218           }
219           ldap_pvt_thread_mutex_destroy(&ldap_pvt_thread_pool_mutex);
220           ldap_pvt_thread_key_destroy( ldap_tpool_key );
221           return(0);
222 }
223 
224 
225 /* Create a thread pool */
226 int
ldap_pvt_thread_pool_init_q(ldap_pvt_thread_pool_t * tpool,int max_threads,int max_pending,int numqs)227 ldap_pvt_thread_pool_init_q (
228           ldap_pvt_thread_pool_t *tpool,
229           int max_threads,
230           int max_pending,
231           int numqs )
232 {
233           ldap_pvt_thread_pool_t pool;
234           struct ldap_int_thread_poolq_s *pq;
235           int i, rc, rem_thr, rem_pend;
236 
237           /* multiple pools are currently not supported (ITS#4943) */
238           assert(!ldap_int_has_thread_pool);
239 
240           if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
241                     max_threads = 0;
242           if (! (1 <= max_pending && max_pending <= MAX_PENDING))
243                     max_pending = MAX_PENDING;
244 
245           *tpool = NULL;
246           pool = (ldap_pvt_thread_pool_t) LDAP_CALLOC(1,
247                     sizeof(struct ldap_int_thread_pool_s));
248 
249           if (pool == NULL) return(-1);
250 
251           pool->ltp_wqs = LDAP_MALLOC(numqs * sizeof(struct ldap_int_thread_poolq_s *));
252           if (pool->ltp_wqs == NULL) {
253                     LDAP_FREE(pool);
254                     return(-1);
255           }
256 
257           for (i=0; i<numqs; i++) {
258                     char *ptr = LDAP_CALLOC(1, sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
259                     if (ptr == NULL) {
260                               for (--i; i>=0; i--)
261                                         LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
262                               LDAP_FREE(pool->ltp_wqs);
263                               LDAP_FREE(pool);
264                               return(-1);
265                     }
266                     pool->ltp_wqs[i] = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
267                     pool->ltp_wqs[i]->ltp_free = ptr;
268           }
269 
270           pool->ltp_numqs = numqs;
271           pool->ltp_conf_max_count = max_threads;
272           if ( !max_threads )
273                     max_threads = LDAP_MAXTHR;
274 
275           rc = ldap_pvt_thread_mutex_init(&pool->ltp_mutex);
276           if (rc != 0) {
277 fail:
278                     for (i=0; i<numqs; i++)
279                               LDAP_FREE(pool->ltp_wqs[i]->ltp_free);
280                     LDAP_FREE(pool->ltp_wqs);
281                     LDAP_FREE(pool);
282                     return(rc);
283           }
284 
285           rc = ldap_pvt_thread_cond_init(&pool->ltp_cond);
286           if (rc != 0)
287                     goto fail;
288 
289           rc = ldap_pvt_thread_cond_init(&pool->ltp_pcond);
290           if (rc != 0)
291                     goto fail;
292 
293           rem_thr = max_threads % numqs;
294           rem_pend = max_pending % numqs;
295           for ( i=0; i<numqs; i++ ) {
296                     pq = pool->ltp_wqs[i];
297                     pq->ltp_pool = pool;
298                     rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
299                     if (rc != 0)
300                               return(rc);
301                     rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
302                     if (rc != 0)
303                               return(rc);
304                     LDAP_STAILQ_INIT(&pq->ltp_pending_list);
305                     pq->ltp_work_list = &pq->ltp_pending_list;
306                     LDAP_SLIST_INIT(&pq->ltp_free_list);
307 
308                     pq->ltp_max_count = max_threads / numqs;
309                     if ( rem_thr ) {
310                               pq->ltp_max_count++;
311                               rem_thr--;
312                     }
313                     pq->ltp_max_pending = max_pending / numqs;
314                     if ( rem_pend ) {
315                               pq->ltp_max_pending++;
316                               rem_pend--;
317                     }
318           }
319 
320           ldap_int_has_thread_pool = 1;
321 
322           pool->ltp_max_count = max_threads;
323           pool->ltp_max_pending = max_pending;
324 
325           ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
326           LDAP_STAILQ_INSERT_TAIL(&ldap_int_thread_pool_list, pool, ltp_next);
327           ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
328 
329           /* Start no threads just yet.  That can break if the process forks
330            * later, as slapd does in order to daemonize.  On at least POSIX,
331            * only the forking thread would survive in the child.  Yet fork()
332            * can't unlock/clean up other threads' locks and data structures,
333            * unless pthread_atfork() handlers have been set up to do so.
334            */
335 
336           *tpool = pool;
337           return(0);
338 }
339 
340 int
ldap_pvt_thread_pool_init(ldap_pvt_thread_pool_t * tpool,int max_threads,int max_pending)341 ldap_pvt_thread_pool_init (
342           ldap_pvt_thread_pool_t *tpool,
343           int max_threads,
344           int max_pending )
345 {
346           return ldap_pvt_thread_pool_init_q( tpool, max_threads, max_pending, 1 );
347 }
348 
349 /* Submit a task to be performed by the thread pool */
350 int
ldap_pvt_thread_pool_submit(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_start_t * start_routine,void * arg)351 ldap_pvt_thread_pool_submit (
352           ldap_pvt_thread_pool_t *tpool,
353           ldap_pvt_thread_start_t *start_routine, void *arg )
354 {
355           return ldap_pvt_thread_pool_submit2( tpool, start_routine, arg, NULL );
356 }
357 
358 /* Submit a task to be performed by the thread pool */
359 int
ldap_pvt_thread_pool_submit2(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_start_t * start_routine,void * arg,void ** cookie)360 ldap_pvt_thread_pool_submit2 (
361           ldap_pvt_thread_pool_t *tpool,
362           ldap_pvt_thread_start_t *start_routine, void *arg,
363           void **cookie )
364 {
365           struct ldap_int_thread_pool_s *pool;
366           struct ldap_int_thread_poolq_s *pq;
367           ldap_int_thread_task_t *task;
368           ldap_pvt_thread_t thr;
369           int i, j;
370 
371           if (tpool == NULL)
372                     return(-1);
373 
374           pool = *tpool;
375 
376           if (pool == NULL)
377                     return(-1);
378 
379           if ( pool->ltp_numqs > 1 ) {
380                     int min = pool->ltp_wqs[0]->ltp_max_pending + pool->ltp_wqs[0]->ltp_max_count;
381                     int min_x = 0, cnt;
382                     for ( i = 0; i < pool->ltp_numqs; i++ ) {
383                               /* take first queue that has nothing active */
384                               if ( !pool->ltp_wqs[i]->ltp_active_count ) {
385                                         min_x = i;
386                                         break;
387                               }
388                               cnt = pool->ltp_wqs[i]->ltp_active_count + pool->ltp_wqs[i]->ltp_pending_count;
389                               if ( cnt < min ) {
390                                         min = cnt;
391                                         min_x = i;
392                               }
393                     }
394                     i = min_x;
395           } else
396                     i = 0;
397 
398           j = i;
399           while(1) {
400                     ldap_pvt_thread_mutex_lock(&pool->ltp_wqs[i]->ltp_mutex);
401                     if (pool->ltp_wqs[i]->ltp_pending_count < pool->ltp_wqs[i]->ltp_max_pending) {
402                               break;
403                     }
404                     ldap_pvt_thread_mutex_unlock(&pool->ltp_wqs[i]->ltp_mutex);
405                     i++;
406                     i %= pool->ltp_numqs;
407                     if ( i == j )
408                               return -1;
409           }
410 
411           pq = pool->ltp_wqs[i];
412           task = LDAP_SLIST_FIRST(&pq->ltp_free_list);
413           if (task) {
414                     LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
415           } else {
416                     task = (ldap_int_thread_task_t *) LDAP_MALLOC(sizeof(*task));
417                     if (task == NULL)
418                               goto failed;
419           }
420 
421           task->ltt_start_routine = start_routine;
422           task->ltt_arg = arg;
423           task->ltt_queue = pq;
424           if ( cookie )
425                     *cookie = task;
426 
427           pq->ltp_pending_count++;
428           LDAP_STAILQ_INSERT_TAIL(&pq->ltp_pending_list, task, ltt_next.q);
429 
430           if (pool->ltp_pause)
431                     goto done;
432 
433           /* should we open (create) a thread? */
434           if (pq->ltp_open_count < pq->ltp_active_count+pq->ltp_pending_count &&
435                     pq->ltp_open_count < pq->ltp_max_count)
436           {
437                     pq->ltp_starting++;
438                     pq->ltp_open_count++;
439 
440                     if (0 != ldap_pvt_thread_create(
441                               &thr, 1, ldap_int_thread_pool_wrapper, pq))
442                     {
443                               /* couldn't create thread.  back out of
444                                * ltp_open_count and check for even worse things.
445                                */
446                               pq->ltp_starting--;
447                               pq->ltp_open_count--;
448 
449                               if (pq->ltp_open_count == 0) {
450                                         /* no open threads at all?!?
451                                          */
452                                         ldap_int_thread_task_t *ptr;
453 
454                                         /* let pool_close know there are no more threads */
455                                         ldap_pvt_thread_cond_signal(&pq->ltp_cond);
456 
457                                         LDAP_STAILQ_FOREACH(ptr, &pq->ltp_pending_list, ltt_next.q)
458                                                   if (ptr == task) break;
459                                         if (ptr == task) {
460                                                   /* no open threads, task not handled, so
461                                                    * back out of ltp_pending_count, free the task,
462                                                    * report the error.
463                                                    */
464                                                   pq->ltp_pending_count--;
465                                                   LDAP_STAILQ_REMOVE(&pq->ltp_pending_list, task,
466                                                             ldap_int_thread_task_s, ltt_next.q);
467                                                   LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task,
468                                                             ltt_next.l);
469                                                   goto failed;
470                                         }
471                               }
472                               /* there is another open thread, so this
473                                * task will be handled eventually.
474                                */
475                     }
476           }
477           ldap_pvt_thread_cond_signal(&pq->ltp_cond);
478 
479  done:
480           ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
481           return(0);
482 
483  failed:
484           ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
485           return(-1);
486 }
487 
488 static void *
no_task(void * ctx,void * arg)489 no_task( void *ctx, void *arg )
490 {
491           return NULL;
492 }
493 
494 /* Cancel a pending task that was previously submitted.
495  * Return 1 if the task was successfully cancelled, 0 if
496  * not found, -1 for invalid parameters
497  */
498 int
ldap_pvt_thread_pool_retract(void * cookie)499 ldap_pvt_thread_pool_retract (
500           void *cookie )
501 {
502           ldap_int_thread_task_t *task, *ttmp;
503           struct ldap_int_thread_poolq_s *pq;
504 
505           if (cookie == NULL)
506                     return(-1);
507 
508           ttmp = cookie;
509           pq = ttmp->ltt_queue;
510           if (pq == NULL)
511                     return(-1);
512 
513           ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
514           LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q)
515                     if (task == ttmp) {
516                               /* Could LDAP_STAILQ_REMOVE the task, but that
517                                * walks ltp_pending_list again to find it.
518                                */
519                               task->ltt_start_routine = no_task;
520                               task->ltt_arg = NULL;
521                               break;
522                     }
523           ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
524           return task != NULL;
525 }
526 
527 /* Walk the pool and allow tasks to be retracted, only to be called while the
528  * pool is paused */
529 int
ldap_pvt_thread_pool_walk(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_start_t * start,ldap_pvt_thread_walk_t * cb,void * arg)530 ldap_pvt_thread_pool_walk(
531           ldap_pvt_thread_pool_t *tpool,
532           ldap_pvt_thread_start_t *start,
533           ldap_pvt_thread_walk_t *cb, void *arg )
534 {
535           struct ldap_int_thread_pool_s *pool;
536           struct ldap_int_thread_poolq_s *pq;
537           ldap_int_thread_task_t *task;
538           int i;
539 
540           if (tpool == NULL)
541                     return(-1);
542 
543           pool = *tpool;
544 
545           if (pool == NULL)
546                     return(-1);
547 
548           ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
549           assert(pool->ltp_pause == PAUSED);
550           ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
551 
552           for (i=0; i<pool->ltp_numqs; i++) {
553                     pq = pool->ltp_wqs[i];
554                     LDAP_STAILQ_FOREACH(task, &pq->ltp_pending_list, ltt_next.q) {
555                               if ( task->ltt_start_routine == start ) {
556                                         if ( cb( task->ltt_start_routine, task->ltt_arg, arg ) ) {
557                                                   /* retract */
558                                                   task->ltt_start_routine = no_task;
559                                                   task->ltt_arg = NULL;
560                                         }
561                               }
562                     }
563           }
564           return 0;
565 }
566 
567 /* Set number of work queues in this pool. Should not be
568  * more than the number of CPUs. */
569 int
ldap_pvt_thread_pool_queues(ldap_pvt_thread_pool_t * tpool,int numqs)570 ldap_pvt_thread_pool_queues(
571           ldap_pvt_thread_pool_t *tpool,
572           int numqs )
573 {
574           struct ldap_int_thread_pool_s *pool;
575           struct ldap_int_thread_poolq_s *pq;
576           int i, rc, rem_thr, rem_pend;
577 
578           if (numqs < 1 || tpool == NULL)
579                     return(-1);
580 
581           pool = *tpool;
582 
583           if (pool == NULL)
584                     return(-1);
585 
586           if (numqs < pool->ltp_numqs) {
587                     for (i=numqs; i<pool->ltp_numqs; i++)
588                               pool->ltp_wqs[i]->ltp_max_count = 0;
589           } else if (numqs > pool->ltp_numqs) {
590                     struct ldap_int_thread_poolq_s **wqs;
591                     wqs = LDAP_REALLOC(pool->ltp_wqs, numqs * sizeof(struct ldap_int_thread_poolq_s *));
592                     if (wqs == NULL)
593                               return(-1);
594                     pool->ltp_wqs = wqs;
595                     for (i=pool->ltp_numqs; i<numqs; i++) {
596                               char *ptr = LDAP_CALLOC(1, sizeof(struct ldap_int_thread_poolq_s) + CACHELINE-1);
597                               if (ptr == NULL) {
598                                         for (; i<numqs; i++)
599                                                   pool->ltp_wqs[i] = NULL;
600                                         return(-1);
601                               }
602                               pq = (struct ldap_int_thread_poolq_s *)(((size_t)ptr + CACHELINE-1) & ~(CACHELINE-1));
603                               pq->ltp_free = ptr;
604                               pool->ltp_wqs[i] = pq;
605                               pq->ltp_pool = pool;
606                               rc = ldap_pvt_thread_mutex_init(&pq->ltp_mutex);
607                               if (rc != 0)
608                                         return(rc);
609                               rc = ldap_pvt_thread_cond_init(&pq->ltp_cond);
610                               if (rc != 0)
611                                         return(rc);
612                               LDAP_STAILQ_INIT(&pq->ltp_pending_list);
613                               pq->ltp_work_list = &pq->ltp_pending_list;
614                               LDAP_SLIST_INIT(&pq->ltp_free_list);
615                     }
616           }
617           rem_thr = pool->ltp_max_count % numqs;
618           rem_pend = pool->ltp_max_pending % numqs;
619           for ( i=0; i<numqs; i++ ) {
620                     pq = pool->ltp_wqs[i];
621                     pq->ltp_max_count = pool->ltp_max_count / numqs;
622                     if ( rem_thr ) {
623                               pq->ltp_max_count++;
624                               rem_thr--;
625                     }
626                     pq->ltp_max_pending = pool->ltp_max_pending / numqs;
627                     if ( rem_pend ) {
628                               pq->ltp_max_pending++;
629                               rem_pend--;
630                     }
631           }
632           pool->ltp_numqs = numqs;
633           return 0;
634 }
635 
636 /* Set max #threads.  value <= 0 means max supported #threads (LDAP_MAXTHR) */
637 int
ldap_pvt_thread_pool_maxthreads(ldap_pvt_thread_pool_t * tpool,int max_threads)638 ldap_pvt_thread_pool_maxthreads(
639           ldap_pvt_thread_pool_t *tpool,
640           int max_threads )
641 {
642           struct ldap_int_thread_pool_s *pool;
643           struct ldap_int_thread_poolq_s *pq;
644           int remthr, i;
645 
646           if (! (0 <= max_threads && max_threads <= LDAP_MAXTHR))
647                     max_threads = 0;
648 
649           if (tpool == NULL)
650                     return(-1);
651 
652           pool = *tpool;
653 
654           if (pool == NULL)
655                     return(-1);
656 
657           pool->ltp_conf_max_count = max_threads;
658           if ( !max_threads )
659                     max_threads = LDAP_MAXTHR;
660           pool->ltp_max_count = max_threads;
661 
662           remthr = max_threads % pool->ltp_numqs;
663           max_threads /= pool->ltp_numqs;
664 
665           for (i=0; i<pool->ltp_numqs; i++) {
666                     pq = pool->ltp_wqs[i];
667                     pq->ltp_max_count = max_threads;
668                     if (remthr) {
669                               pq->ltp_max_count++;
670                               remthr--;
671                     }
672           }
673           return(0);
674 }
675 
676 /* Inspect the pool */
677 int
ldap_pvt_thread_pool_query(ldap_pvt_thread_pool_t * tpool,ldap_pvt_thread_pool_param_t param,void * value)678 ldap_pvt_thread_pool_query(
679           ldap_pvt_thread_pool_t *tpool,
680           ldap_pvt_thread_pool_param_t param,
681           void *value )
682 {
683           struct ldap_int_thread_pool_s *pool;
684           int                                     count = -1;
685 
686           if ( tpool == NULL || value == NULL ) {
687                     return -1;
688           }
689 
690           pool = *tpool;
691 
692           if ( pool == NULL ) {
693                     return 0;
694           }
695 
696           switch ( param ) {
697           case LDAP_PVT_THREAD_POOL_PARAM_MAX:
698                     count = pool->ltp_conf_max_count;
699                     break;
700 
701           case LDAP_PVT_THREAD_POOL_PARAM_MAX_PENDING:
702                     count = pool->ltp_max_pending;
703                     if (count < 0)
704                               count = -count;
705                     if (count == MAX_PENDING)
706                               count = 0;
707                     break;
708 
709           case LDAP_PVT_THREAD_POOL_PARAM_PAUSING:
710                     ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
711                     count = (pool->ltp_pause != 0);
712                     ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
713                     break;
714 
715           case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
716           case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
717           case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
718           case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
719           case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
720                     {
721                               int i;
722                               count = 0;
723                               for (i=0; i<pool->ltp_numqs; i++) {
724                                         struct ldap_int_thread_poolq_s *pq = pool->ltp_wqs[i];
725                                         ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
726                                         switch(param) {
727                                                   case LDAP_PVT_THREAD_POOL_PARAM_OPEN:
728                                                             count += pq->ltp_open_count;
729                                                             break;
730                                                   case LDAP_PVT_THREAD_POOL_PARAM_STARTING:
731                                                             count += pq->ltp_starting;
732                                                             break;
733                                                   case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE:
734                                                             count += pq->ltp_active_count;
735                                                             break;
736                                                   case LDAP_PVT_THREAD_POOL_PARAM_PENDING:
737                                                             count += pq->ltp_pending_count;
738                                                             break;
739                                                   case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD:
740                                                             count += pq->ltp_pending_count + pq->ltp_active_count;
741                                                             break;
742                                                   default:
743                                                             break;
744                                         }
745                                         ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
746                               }
747                               if (count < 0)
748                                         count = -count;
749                     }
750                     break;
751 
752           case LDAP_PVT_THREAD_POOL_PARAM_ACTIVE_MAX:
753                     break;
754 
755           case LDAP_PVT_THREAD_POOL_PARAM_PENDING_MAX:
756                     break;
757 
758           case LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD_MAX:
759                     break;
760 
761           case LDAP_PVT_THREAD_POOL_PARAM_STATE:
762                     if (pool->ltp_pause)
763                               *((char **)value) = "pausing";
764                     else if (!pool->ltp_finishing)
765                               *((char **)value) = "running";
766                     else {
767                               int i;
768                               for (i=0; i<pool->ltp_numqs; i++)
769                                         if (pool->ltp_wqs[i]->ltp_pending_count) break;
770                               if (i<pool->ltp_numqs)
771                                         *((char **)value) = "finishing";
772                               else
773                                         *((char **)value) = "stopping";
774                     }
775                     break;
776 
777           case LDAP_PVT_THREAD_POOL_PARAM_UNKNOWN:
778                     break;
779           }
780 
781           if ( count > -1 ) {
782                     *((int *)value) = count;
783           }
784 
785           return ( count == -1 ? -1 : 0 );
786 }
787 
788 /*
789  * true if pool is pausing; does not lock any mutex to check.
790  * 0 if not pause, 1 if pause, -1 if error or no pool.
791  */
792 int
ldap_pvt_thread_pool_pausing(ldap_pvt_thread_pool_t * tpool)793 ldap_pvt_thread_pool_pausing( ldap_pvt_thread_pool_t *tpool )
794 {
795           int rc = -1;
796           struct ldap_int_thread_pool_s *pool;
797 
798           if ( tpool != NULL && (pool = *tpool) != NULL ) {
799                     rc = (pool->ltp_pause != 0);
800           }
801 
802           return rc;
803 }
804 
805 /*
806  * wrapper for ldap_pvt_thread_pool_query(), left around
807  * for backwards compatibility
808  */
809 int
ldap_pvt_thread_pool_backload(ldap_pvt_thread_pool_t * tpool)810 ldap_pvt_thread_pool_backload ( ldap_pvt_thread_pool_t *tpool )
811 {
812           int       rc, count;
813 
814           rc = ldap_pvt_thread_pool_query( tpool,
815                     LDAP_PVT_THREAD_POOL_PARAM_BACKLOAD, (void *)&count );
816 
817           if ( rc == 0 ) {
818                     return count;
819           }
820 
821           return rc;
822 }
823 
824 
825 /*
826  * wrapper for ldap_pvt_thread_pool_close+free(), left around
827  * for backwards compatibility
828  */
829 int
ldap_pvt_thread_pool_destroy(ldap_pvt_thread_pool_t * tpool,int run_pending)830 ldap_pvt_thread_pool_destroy ( ldap_pvt_thread_pool_t *tpool, int run_pending )
831 {
832           int rc;
833 
834           if ( (rc = ldap_pvt_thread_pool_close( tpool, run_pending )) ) {
835                     return rc;
836           }
837 
838           return ldap_pvt_thread_pool_free( tpool );
839 }
840 
841 /* Shut down the pool making its threads finish */
842 int
ldap_pvt_thread_pool_close(ldap_pvt_thread_pool_t * tpool,int run_pending)843 ldap_pvt_thread_pool_close ( ldap_pvt_thread_pool_t *tpool, int run_pending )
844 {
845           struct ldap_int_thread_pool_s *pool, *pptr;
846           struct ldap_int_thread_poolq_s *pq;
847           ldap_int_thread_task_t *task;
848           int i;
849 
850           if (tpool == NULL)
851                     return(-1);
852 
853           pool = *tpool;
854 
855           if (pool == NULL) return(-1);
856 
857           ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
858           LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
859                     if (pptr == pool) break;
860           ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
861 
862           if (pool != pptr) return(-1);
863 
864           ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
865 
866           pool->ltp_finishing = 1;
867           if (pool->ltp_max_pending > 0)
868                     pool->ltp_max_pending = -pool->ltp_max_pending;
869 
870           ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
871           ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
872 
873           for (i=0; i<pool->ltp_numqs; i++) {
874                     pq = pool->ltp_wqs[i];
875                     ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
876                     if (pq->ltp_max_pending > 0)
877                               pq->ltp_max_pending = -pq->ltp_max_pending;
878                     if (!run_pending) {
879                               while ((task = LDAP_STAILQ_FIRST(&pq->ltp_pending_list)) != NULL) {
880                                         LDAP_STAILQ_REMOVE_HEAD(&pq->ltp_pending_list, ltt_next.q);
881                                         LDAP_FREE(task);
882                               }
883                               pq->ltp_pending_count = 0;
884                     }
885 
886                     while (pq->ltp_open_count) {
887                               ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
888                               ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
889                     }
890 
891                     while ((task = LDAP_SLIST_FIRST(&pq->ltp_free_list)) != NULL)
892                     {
893                               LDAP_SLIST_REMOVE_HEAD(&pq->ltp_free_list, ltt_next.l);
894                               LDAP_FREE(task);
895                     }
896                     ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
897           }
898 
899           return(0);
900 }
901 
902 /* Destroy the pool, everything must have already shut down */
903 int
ldap_pvt_thread_pool_free(ldap_pvt_thread_pool_t * tpool)904 ldap_pvt_thread_pool_free ( ldap_pvt_thread_pool_t *tpool )
905 {
906           struct ldap_int_thread_pool_s *pool, *pptr;
907           struct ldap_int_thread_poolq_s *pq;
908           int i;
909 
910           if (tpool == NULL)
911                     return(-1);
912 
913           pool = *tpool;
914 
915           if (pool == NULL) return(-1);
916 
917           ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
918           LDAP_STAILQ_FOREACH(pptr, &ldap_int_thread_pool_list, ltp_next)
919                     if (pptr == pool) break;
920           if (pptr == pool)
921                     LDAP_STAILQ_REMOVE(&ldap_int_thread_pool_list, pool,
922                               ldap_int_thread_pool_s, ltp_next);
923           ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
924 
925           if (pool != pptr) return(-1);
926 
927           ldap_pvt_thread_cond_destroy(&pool->ltp_pcond);
928           ldap_pvt_thread_cond_destroy(&pool->ltp_cond);
929           ldap_pvt_thread_mutex_destroy(&pool->ltp_mutex);
930           for (i=0; i<pool->ltp_numqs; i++) {
931                     pq = pool->ltp_wqs[i];
932 
933                     assert( !pq->ltp_open_count );
934                     assert( LDAP_SLIST_EMPTY(&pq->ltp_free_list) );
935                     ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
936                     ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
937                     if (pq->ltp_free) {
938                               LDAP_FREE(pq->ltp_free);
939                     }
940           }
941           LDAP_FREE(pool->ltp_wqs);
942           LDAP_FREE(pool);
943           *tpool = NULL;
944           ldap_int_has_thread_pool = 0;
945           return(0);
946 }
947 
948 /* Thread loop.  Accept and handle submitted tasks. */
949 static void *
ldap_int_thread_pool_wrapper(void * xpool)950 ldap_int_thread_pool_wrapper (
951           void *xpool )
952 {
953           struct ldap_int_thread_poolq_s *pq = xpool;
954           struct ldap_int_thread_pool_s *pool = pq->ltp_pool;
955           ldap_int_thread_task_t *task;
956           ldap_int_tpool_plist_t *work_list;
957           ldap_int_thread_userctx_t ctx, *kctx;
958           unsigned i, keyslot, hash;
959           int pool_lock = 0, freeme = 0;
960 
961           assert(pool != NULL);
962 
963           for ( i=0; i<MAXKEYS; i++ ) {
964                     ctx.ltu_key[i].ltk_key = NULL;
965           }
966 
967           ctx.ltu_pq = pq;
968           ctx.ltu_id = ldap_pvt_thread_self();
969           TID_HASH(ctx.ltu_id, hash);
970 
971           ldap_pvt_thread_key_setdata( ldap_tpool_key, &ctx );
972 
973           if (pool->ltp_pause) {
974                     ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
975                     /* thread_keys[] is read-only when paused */
976                     while (pool->ltp_pause)
977                               ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
978                     ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
979           }
980 
981           /* find a key slot to give this thread ID and store a
982            * pointer to our keys there; start at the thread ID
983            * itself (mod LDAP_MAXTHR) and look for an empty slot.
984            */
985           ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
986           for (keyslot = hash & (LDAP_MAXTHR-1);
987                     (kctx = thread_keys[keyslot].ctx) && kctx != DELETED_THREAD_CTX;
988                     keyslot = (keyslot+1) & (LDAP_MAXTHR-1));
989           thread_keys[keyslot].ctx = &ctx;
990           ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
991 
992           ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
993           pq->ltp_starting--;
994           pq->ltp_active_count++;
995 
996           for (;;) {
997                     work_list = pq->ltp_work_list; /* help the compiler a bit */
998                     task = LDAP_STAILQ_FIRST(work_list);
999                     if (task == NULL) { /* paused or no pending tasks */
1000                               if (--(pq->ltp_active_count) < 1) {
1001                                         if (pool->ltp_pause) {
1002                                                   ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1003                                                   ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1004                                                   pool_lock = 1;
1005                                                   if (--(pool->ltp_active_queues) < 1) {
1006                                                             /* Notify pool_pause it is the sole active thread. */
1007                                                             ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
1008                                                   }
1009                                         }
1010                               }
1011 
1012                               do {
1013                                         if (pool->ltp_finishing || pq->ltp_open_count > pq->ltp_max_count) {
1014                                                   /* Not paused, and either finishing or too many
1015                                                    * threads running (can happen if ltp_max_count
1016                                                    * was reduced).  Let this thread die.
1017                                                    */
1018                                                   goto done;
1019                                         }
1020 
1021                                         /* We could check an idle timer here, and let the
1022                                          * thread die if it has been inactive for a while.
1023                                          * Only die if there are other open threads (i.e.,
1024                                          * always have at least one thread open).
1025                                          * The check should be like this:
1026                                          *   if (pool->ltp_open_count>1 && pool->ltp_starting==0)
1027                                          *       check timer, wait if ltp_pause, leave thread;
1028                                          *
1029                                          * Just use pthread_cond_timedwait() if we want to
1030                                          * check idle time.
1031                                          */
1032                                         if (pool_lock) {
1033                                                   ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1034                                                   if (!pool->ltp_pause) {
1035                                                             ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1036                                                             ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1037                                                             pool_lock = 0;
1038                                                   }
1039                                         } else
1040                                                   ldap_pvt_thread_cond_wait(&pq->ltp_cond, &pq->ltp_mutex);
1041 
1042                                         work_list = pq->ltp_work_list;
1043                                         task = LDAP_STAILQ_FIRST(work_list);
1044                               } while (task == NULL);
1045 
1046                               if (pool_lock) {
1047                                         ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1048                                         ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1049                                         pool_lock = 0;
1050                               }
1051                               pq->ltp_active_count++;
1052                     }
1053 
1054                     LDAP_STAILQ_REMOVE_HEAD(work_list, ltt_next.q);
1055                     pq->ltp_pending_count--;
1056                     ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1057 
1058                     task->ltt_start_routine(&ctx, task->ltt_arg);
1059 
1060                     ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1061                     LDAP_SLIST_INSERT_HEAD(&pq->ltp_free_list, task, ltt_next.l);
1062           }
1063  done:
1064 
1065           ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
1066 
1067           /* The pool_mutex lock protects ctx->ltu_key from pool_purgekey()
1068            * during this call, since it prevents new pauses. */
1069           ldap_pvt_thread_pool_context_reset(&ctx);
1070 
1071           thread_keys[keyslot].ctx = DELETED_THREAD_CTX;
1072           ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
1073 
1074           pq->ltp_open_count--;
1075           if (pq->ltp_open_count == 0) {
1076                     if (pool->ltp_finishing)
1077                               /* let pool_destroy know we're all done */
1078                               ldap_pvt_thread_cond_signal(&pq->ltp_cond);
1079                     else
1080                               freeme = 1;
1081           }
1082 
1083           if (pool_lock)
1084                     ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1085           else
1086                     ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1087 
1088           if (freeme) {
1089                     ldap_pvt_thread_cond_destroy(&pq->ltp_cond);
1090                     ldap_pvt_thread_mutex_destroy(&pq->ltp_mutex);
1091                     LDAP_FREE(pq->ltp_free);
1092                     pq->ltp_free = NULL;
1093           }
1094           ldap_pvt_thread_exit(NULL);
1095           return(NULL);
1096 }
1097 
1098 /* Arguments > ltp_pause to handle_pause(,PAUSE_ARG()).  arg=PAUSE_ARG
1099  * ensures (arg-ltp_pause) sets GO_* at need and keeps DO_PAUSE/GO_*.
1100  */
1101 #define GO_IDLE               8
1102 #define GO_UNIDLE   16
1103 #define CHECK_PAUSE 32        /* if ltp_pause: GO_IDLE; wait; GO_UNIDLE */
1104 #define DO_PAUSE    64        /* CHECK_PAUSE; pause the pool */
1105 #define PAUSE_ARG(a) \
1106                     ((a) | ((a) & (GO_IDLE|GO_UNIDLE) ? GO_IDLE-1 : CHECK_PAUSE))
1107 
1108 static int
handle_pause(ldap_pvt_thread_pool_t * tpool,int pause_type)1109 handle_pause( ldap_pvt_thread_pool_t *tpool, int pause_type )
1110 {
1111           struct ldap_int_thread_pool_s *pool;
1112           struct ldap_int_thread_poolq_s *pq;
1113           int ret = 0, pause, max_ltp_pause;
1114 
1115           if (tpool == NULL)
1116                     return(-1);
1117 
1118           pool = *tpool;
1119 
1120           if (pool == NULL)
1121                     return(0);
1122 
1123           if (pause_type == CHECK_PAUSE && !pool->ltp_pause)
1124                     return(0);
1125 
1126           {
1127                     ldap_int_thread_userctx_t *ctx = ldap_pvt_thread_pool_context();
1128                     pq = ctx->ltu_pq;
1129                     if ( !pq )
1130                               return(-1);
1131           }
1132 
1133           /* Let pool_unidle() ignore requests for new pauses */
1134           max_ltp_pause = pause_type==PAUSE_ARG(GO_UNIDLE) ? WANT_PAUSE : NOT_PAUSED;
1135 
1136           ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1137 
1138           pause = pool->ltp_pause;      /* NOT_PAUSED, WANT_PAUSE or PAUSED */
1139 
1140           /* If ltp_pause and not GO_IDLE|GO_UNIDLE: Set GO_IDLE,GO_UNIDLE */
1141           pause_type -= pause;
1142 
1143           if (pause_type & GO_IDLE) {
1144                     int do_pool = 0;
1145                     ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1146                     pq->ltp_pending_count++;
1147                     pq->ltp_active_count--;
1148                     if (pause && pq->ltp_active_count < 1) {
1149                               do_pool = 1;
1150                     }
1151                     ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1152                     if (do_pool) {
1153                               pool->ltp_active_queues--;
1154                               if (pool->ltp_active_queues < 1)
1155                               /* Tell the task waiting to DO_PAUSE it can proceed */
1156                                         ldap_pvt_thread_cond_signal(&pool->ltp_pcond);
1157                     }
1158           }
1159 
1160           if (pause_type & GO_UNIDLE) {
1161                     /* Wait out pause if any, then cancel GO_IDLE */
1162                     if (pause > max_ltp_pause) {
1163                               ret = 1;
1164                               do {
1165                                         ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1166                               } while (pool->ltp_pause > max_ltp_pause);
1167                     }
1168                     ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1169                     pq->ltp_pending_count--;
1170                     pq->ltp_active_count++;
1171                     ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1172           }
1173 
1174           if (pause_type & DO_PAUSE) {
1175                     int i, j;
1176                     /* Tell everyone else to pause or finish, then await that */
1177                     ret = 0;
1178                     assert(!pool->ltp_pause);
1179                     pool->ltp_pause = WANT_PAUSE;
1180                     pool->ltp_active_queues = 0;
1181 
1182                     for (i=0; i<pool->ltp_numqs; i++)
1183                               if (pool->ltp_wqs[i] == pq) break;
1184 
1185                     ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1186                     /* temporarily remove ourself from active count */
1187                     pq->ltp_active_count--;
1188 
1189                     j=i;
1190                     do {
1191                               pq = pool->ltp_wqs[j];
1192                               if (j != i)
1193                                         ldap_pvt_thread_mutex_lock(&pq->ltp_mutex);
1194 
1195                               /* Hide pending tasks from ldap_pvt_thread_pool_wrapper() */
1196                               pq->ltp_work_list = &empty_pending_list;
1197 
1198                               if (pq->ltp_active_count > 0)
1199                                         pool->ltp_active_queues++;
1200 
1201                               ldap_pvt_thread_mutex_unlock(&pq->ltp_mutex);
1202                               if (pool->ltp_numqs > 1) {
1203                                         j++;
1204                                         j %= pool->ltp_numqs;
1205                               }
1206                     } while (j != i);
1207 
1208                     /* Wait for this task to become the sole active task */
1209                     while (pool->ltp_active_queues > 0)
1210                               ldap_pvt_thread_cond_wait(&pool->ltp_pcond, &pool->ltp_mutex);
1211 
1212                     /* restore us to active count */
1213                     pool->ltp_wqs[i]->ltp_active_count++;
1214 
1215                     assert(pool->ltp_pause == WANT_PAUSE);
1216                     pool->ltp_pause = PAUSED;
1217           }
1218           ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1219 
1220           return(ret);
1221 }
1222 
1223 /* Consider this task idle: It will not block pool_pause() in other tasks. */
1224 void
ldap_pvt_thread_pool_idle(ldap_pvt_thread_pool_t * tpool)1225 ldap_pvt_thread_pool_idle( ldap_pvt_thread_pool_t *tpool )
1226 {
1227           handle_pause(tpool, PAUSE_ARG(GO_IDLE));
1228 }
1229 
1230 /* Cancel pool_idle(). If the pool is paused, wait it out first. */
1231 void
ldap_pvt_thread_pool_unidle(ldap_pvt_thread_pool_t * tpool)1232 ldap_pvt_thread_pool_unidle( ldap_pvt_thread_pool_t *tpool )
1233 {
1234           handle_pause(tpool, PAUSE_ARG(GO_UNIDLE));
1235 }
1236 
1237 /*
1238  * If a pause was requested, wait for it.  If several threads
1239  * are waiting to pause, let through one or more pauses.
1240  * The calling task must be active, not idle.
1241  * Return 1 if we waited, 0 if not, -1 at parameter error.
1242  */
1243 int
ldap_pvt_thread_pool_pausecheck(ldap_pvt_thread_pool_t * tpool)1244 ldap_pvt_thread_pool_pausecheck( ldap_pvt_thread_pool_t *tpool )
1245 {
1246           return handle_pause(tpool, PAUSE_ARG(CHECK_PAUSE));
1247 }
1248 
1249 /*
1250  * Wait for a pause, from a non-pooled thread.
1251  */
1252 int
ldap_pvt_thread_pool_pausecheck_native(ldap_pvt_thread_pool_t * tpool)1253 ldap_pvt_thread_pool_pausecheck_native( ldap_pvt_thread_pool_t *tpool )
1254 {
1255           struct ldap_int_thread_pool_s *pool;
1256 
1257           if (tpool == NULL)
1258                     return(-1);
1259 
1260           pool = *tpool;
1261 
1262           if (pool == NULL)
1263                     return(0);
1264 
1265           if (!pool->ltp_pause)
1266                     return(0);
1267 
1268           ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1269           while (pool->ltp_pause)
1270                               ldap_pvt_thread_cond_wait(&pool->ltp_cond, &pool->ltp_mutex);
1271           ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1272           return 1;
1273 }
1274 
1275 /*
1276  * Pause the pool.  The calling task must be active, not idle.
1277  * Return when all other tasks are paused or idle.
1278  */
1279 int
ldap_pvt_thread_pool_pause(ldap_pvt_thread_pool_t * tpool)1280 ldap_pvt_thread_pool_pause( ldap_pvt_thread_pool_t *tpool )
1281 {
1282           return handle_pause(tpool, PAUSE_ARG(DO_PAUSE));
1283 }
1284 
1285 /* End a pause */
1286 int
ldap_pvt_thread_pool_resume(ldap_pvt_thread_pool_t * tpool)1287 ldap_pvt_thread_pool_resume (
1288           ldap_pvt_thread_pool_t *tpool )
1289 {
1290           struct ldap_int_thread_pool_s *pool;
1291           struct ldap_int_thread_poolq_s *pq;
1292           int i;
1293 
1294           if (tpool == NULL)
1295                     return(-1);
1296 
1297           pool = *tpool;
1298 
1299           if (pool == NULL)
1300                     return(0);
1301 
1302           ldap_pvt_thread_mutex_lock(&pool->ltp_mutex);
1303           assert(pool->ltp_pause == PAUSED);
1304           pool->ltp_pause = 0;
1305           for (i=0; i<pool->ltp_numqs; i++) {
1306                     pq = pool->ltp_wqs[i];
1307                     pq->ltp_work_list = &pq->ltp_pending_list;
1308                     ldap_pvt_thread_cond_broadcast(&pq->ltp_cond);
1309           }
1310           ldap_pvt_thread_cond_broadcast(&pool->ltp_cond);
1311           ldap_pvt_thread_mutex_unlock(&pool->ltp_mutex);
1312           return(0);
1313 }
1314 
1315 /*
1316  * Get the key's data and optionally free function in the given context.
1317  */
ldap_pvt_thread_pool_getkey(void * xctx,void * key,void ** data,ldap_pvt_thread_pool_keyfree_t ** kfree)1318 int ldap_pvt_thread_pool_getkey(
1319           void *xctx,
1320           void *key,
1321           void **data,
1322           ldap_pvt_thread_pool_keyfree_t **kfree )
1323 {
1324           ldap_int_thread_userctx_t *ctx = xctx;
1325           int i;
1326 
1327           if ( !ctx || !key || !data ) return EINVAL;
1328 
1329           for ( i=0; i<MAXKEYS && ctx->ltu_key[i].ltk_key; i++ ) {
1330                     if ( ctx->ltu_key[i].ltk_key == key ) {
1331                               *data = ctx->ltu_key[i].ltk_data;
1332                               if ( kfree ) *kfree = ctx->ltu_key[i].ltk_free;
1333                               return 0;
1334                     }
1335           }
1336           return ENOENT;
1337 }
1338 
1339 static void
clear_key_idx(ldap_int_thread_userctx_t * ctx,int i)1340 clear_key_idx( ldap_int_thread_userctx_t *ctx, int i )
1341 {
1342           for ( ; i < MAXKEYS-1 && ctx->ltu_key[i+1].ltk_key; i++ )
1343                     ctx->ltu_key[i] = ctx->ltu_key[i+1];
1344           ctx->ltu_key[i].ltk_key = NULL;
1345 }
1346 
1347 /*
1348  * Set or remove data for the key in the given context.
1349  * key can be any unique pointer.
1350  * kfree() is an optional function to free the data (but not the key):
1351  *   pool_context_reset() and pool_purgekey() call kfree(key, data),
1352  *   but pool_setkey() does not.  For pool_setkey() it is the caller's
1353  *   responsibility to free any existing data with the same key.
1354  *   kfree() must not call functions taking a tpool argument.
1355  */
ldap_pvt_thread_pool_setkey(void * xctx,void * key,void * data,ldap_pvt_thread_pool_keyfree_t * kfree,void ** olddatap,ldap_pvt_thread_pool_keyfree_t ** oldkfreep)1356 int ldap_pvt_thread_pool_setkey(
1357           void *xctx,
1358           void *key,
1359           void *data,
1360           ldap_pvt_thread_pool_keyfree_t *kfree,
1361           void **olddatap,
1362           ldap_pvt_thread_pool_keyfree_t **oldkfreep )
1363 {
1364           ldap_int_thread_userctx_t *ctx = xctx;
1365           int i, found;
1366 
1367           if ( !ctx || !key ) return EINVAL;
1368 
1369           for ( i=found=0; i<MAXKEYS; i++ ) {
1370                     if ( ctx->ltu_key[i].ltk_key == key ) {
1371                               found = 1;
1372                               break;
1373                     } else if ( !ctx->ltu_key[i].ltk_key ) {
1374                               break;
1375                     }
1376           }
1377 
1378           if ( olddatap ) {
1379                     if ( found ) {
1380                               *olddatap = ctx->ltu_key[i].ltk_data;
1381                     } else {
1382                               *olddatap = NULL;
1383                     }
1384           }
1385 
1386           if ( oldkfreep ) {
1387                     if ( found ) {
1388                               *oldkfreep = ctx->ltu_key[i].ltk_free;
1389                     } else {
1390                               *oldkfreep = 0;
1391                     }
1392           }
1393 
1394           if ( data || kfree ) {
1395                     if ( i>=MAXKEYS )
1396                               return ENOMEM;
1397                     ctx->ltu_key[i].ltk_key = key;
1398                     ctx->ltu_key[i].ltk_data = data;
1399                     ctx->ltu_key[i].ltk_free = kfree;
1400           } else if ( found ) {
1401                     clear_key_idx( ctx, i );
1402           }
1403 
1404           return 0;
1405 }
1406 
1407 /* Free all elements with this key, no matter which thread they're in.
1408  * May only be called while the pool is paused.
1409  */
ldap_pvt_thread_pool_purgekey(void * key)1410 void ldap_pvt_thread_pool_purgekey( void *key )
1411 {
1412           int i, j;
1413           ldap_int_thread_userctx_t *ctx;
1414 
1415           assert ( key != NULL );
1416 
1417           ldap_pvt_thread_mutex_lock(&ldap_pvt_thread_pool_mutex);
1418           for ( i=0; i<LDAP_MAXTHR; i++ ) {
1419                     ctx = thread_keys[i].ctx;
1420                     if ( ctx && ctx != DELETED_THREAD_CTX ) {
1421                               for ( j=0; j<MAXKEYS && ctx->ltu_key[j].ltk_key; j++ ) {
1422                                         if ( ctx->ltu_key[j].ltk_key == key ) {
1423                                                   if (ctx->ltu_key[j].ltk_free)
1424                                                             ctx->ltu_key[j].ltk_free( ctx->ltu_key[j].ltk_key,
1425                                                             ctx->ltu_key[j].ltk_data );
1426                                                   clear_key_idx( ctx, j );
1427                                                   break;
1428                                         }
1429                               }
1430                     }
1431           }
1432           ldap_pvt_thread_mutex_unlock(&ldap_pvt_thread_pool_mutex);
1433 }
1434 
1435 /*
1436  * Find the context of the current thread.
1437  * This is necessary if the caller does not have access to the
1438  * thread context handle (for example, a slapd plugin calling
1439  * slapi_search_internal()). No doubt it is more efficient
1440  * for the application to keep track of the thread context
1441  * handles itself.
1442  */
ldap_pvt_thread_pool_context()1443 void *ldap_pvt_thread_pool_context( )
1444 {
1445           void *ctx = NULL;
1446 
1447           ldap_pvt_thread_key_getdata( ldap_tpool_key, &ctx );
1448           return ctx ? ctx : (void *) &ldap_int_main_thrctx;
1449 }
1450 
1451 /*
1452  * Free the context's keys.
1453  * Must not call functions taking a tpool argument (because this
1454  * thread already holds ltp_mutex when called from pool_wrapper()).
1455  */
ldap_pvt_thread_pool_context_reset(void * vctx)1456 void ldap_pvt_thread_pool_context_reset( void *vctx )
1457 {
1458           ldap_int_thread_userctx_t *ctx = vctx;
1459           int i;
1460 
1461           for ( i=MAXKEYS-1; i>=0; i--) {
1462                     if ( !ctx->ltu_key[i].ltk_key )
1463                               continue;
1464                     if ( ctx->ltu_key[i].ltk_free )
1465                               ctx->ltu_key[i].ltk_free( ctx->ltu_key[i].ltk_key,
1466                               ctx->ltu_key[i].ltk_data );
1467                     ctx->ltu_key[i].ltk_key = NULL;
1468           }
1469 }
1470 
ldap_pvt_thread_pool_tid(void * vctx)1471 ldap_pvt_thread_t ldap_pvt_thread_pool_tid( void *vctx )
1472 {
1473           ldap_int_thread_userctx_t *ctx = vctx;
1474 
1475           return ctx->ltu_id;
1476 }
1477 #endif /* LDAP_THREAD_HAVE_TPOOL */
1478 
1479 #endif /* LDAP_R_COMPILE */
1480