1 /*        $NetBSD: work_thread.c,v 1.9 2024/08/18 20:47:13 christos Exp $       */
2 
3 /*
4  * work_thread.c - threads implementation for blocking worker child.
5  */
6 #include <config.h>
7 #include "ntp_workimpl.h"
8 
9 #ifdef WORK_THREAD
10 
11 #include <stdio.h>
12 #include <ctype.h>
13 #include <signal.h>
14 #ifndef SYS_WINNT
15 #include <pthread.h>
16 #endif
17 
18 #include "ntp_stdlib.h"
19 #include "ntp_malloc.h"
20 #include "ntp_syslog.h"
21 #include "ntpd.h"
22 #include "ntp_io.h"
23 #include "ntp_assert.h"
24 #include "ntp_unixtime.h"
25 #include "timespecops.h"
26 #include "ntp_worker.h"
27 
28 #define CHILD_EXIT_REQ        ((blocking_pipe_header *)(intptr_t)-1)
29 #define CHILD_GONE_RESP       CHILD_EXIT_REQ
30 /* Queue size increments:
31  * The request queue grows a bit faster than the response queue -- the
32  * daemon can push requests and pull results faster on avarage than the
33  * worker can process requests and push results...  If this really pays
34  * off is debatable.
35  */
36 #define WORKITEMS_ALLOC_INC   16
37 #define RESPONSES_ALLOC_INC   4
38 
39 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
40  * set the maximum to 256kB. If the minimum goes below the
41  * system-defined minimum stack size, we have to adjust accordingly.
42  */
43 #ifndef THREAD_MINSTACKSIZE
44 # define THREAD_MINSTACKSIZE  (64U * 1024)
45 #endif
46 
47 #ifndef THREAD_MAXSTACKSIZE
48 # define THREAD_MAXSTACKSIZE  (256U * 1024)
49 #endif
50 
51 /* need a good integer to store a pointer... */
52 #ifndef UINTPTR_T
53 # if defined(UINTPTR_MAX)
54 #  define UINTPTR_T uintptr_t
55 # elif defined(UINT_PTR)
56 #  define UINTPTR_T UINT_PTR
57 # else
58 #  define UINTPTR_T size_t
59 # endif
60 #endif
61 
62 
63 #ifdef SYS_WINNT
64 
65 # define thread_exit(c)       _endthreadex(c)
66 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
67 u_int     WINAPI    blocking_thread(void *);
68 static BOOL         same_os_sema(const sem_ref obj, void * osobj);
69 
70 #else
71 
72 # define thread_exit(c)       pthread_exit((void*)(UINTPTR_T)(c))
73 # define tickle_sem sem_post
74 void *              blocking_thread(void *);
75 static    void      block_thread_signals(sigset_t *);
76 
77 #endif
78 
79 #ifdef WORK_PIPE
80 addremove_io_fd_func                    addremove_io_fd;
81 #else
82 addremove_io_semaphore_func   addremove_io_semaphore;
83 #endif
84 
85 static    void      start_blocking_thread(blocking_child *);
86 static    void      start_blocking_thread_internal(blocking_child *);
87 static    void      prepare_child_sems(blocking_child *);
88 static    int       wait_for_sem(sem_ref, struct timespec *);
89 static    int       ensure_workitems_empty_slot(blocking_child *);
90 static    int       ensure_workresp_empty_slot(blocking_child *);
91 static    int       queue_req_pointer(blocking_child *, blocking_pipe_header *);
92 static    void      cleanup_after_child(blocking_child *);
93 
94 static sema_type worker_mmutex;
95 static sem_ref   worker_memlock;
96 
97 /* --------------------------------------------------------------------
98  * locking the global worker state table (and other global stuff)
99  */
100 void
worker_global_lock(int inOrOut)101 worker_global_lock(
102           int inOrOut)
103 {
104           if (worker_memlock) {
105                     if (inOrOut)
106                               wait_for_sem(worker_memlock, NULL);
107                     else
108                               tickle_sem(worker_memlock);
109           }
110 }
111 
112 /* --------------------------------------------------------------------
113  * implementation isolation wrapper
114  */
115 void
exit_worker(int exitcode)116 exit_worker(
117           int       exitcode
118           )
119 {
120           thread_exit(exitcode);        /* see #define thread_exit */
121 }
122 
123 /* --------------------------------------------------------------------
124  * sleep for a given time or until the wakup semaphore is tickled.
125  */
126 int
worker_sleep(blocking_child * c,time_t seconds)127 worker_sleep(
128           blocking_child *    c,
129           time_t                        seconds
130           )
131 {
132           struct timespec     until;
133           int                 rc;
134 
135 # ifdef HAVE_CLOCK_GETTIME
136           if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
137                     msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
138                     return -1;
139           }
140 # else
141           if (0 != getclock(TIMEOFDAY, &until)) {
142                     msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
143                     return -1;
144           }
145 # endif
146           until.tv_sec += seconds;
147           rc = wait_for_sem(c->wake_scheduled_sleep, &until);
148           if (0 == rc)
149                     return -1;
150           if (-1 == rc && ETIMEDOUT == errno)
151                     return 0;
152           msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
153           return -1;
154 }
155 
156 
157 /* --------------------------------------------------------------------
158  * Wake up a worker that takes a nap.
159  */
160 void
interrupt_worker_sleep(void)161 interrupt_worker_sleep(void)
162 {
163           u_int                         idx;
164           blocking_child *    c;
165 
166           for (idx = 0; idx < blocking_children_alloc; idx++) {
167                     c = blocking_children[idx];
168                     if (NULL == c || NULL == c->wake_scheduled_sleep)
169                               continue;
170                     tickle_sem(c->wake_scheduled_sleep);
171           }
172 }
173 
174 /* --------------------------------------------------------------------
175  * Make sure there is an empty slot at the head of the request
176  * queue. Tell if the queue is currently empty.
177  */
178 static int
ensure_workitems_empty_slot(blocking_child * c)179 ensure_workitems_empty_slot(
180           blocking_child *c
181           )
182 {
183           /*
184           ** !!! PRECONDITION: caller holds access lock!
185           **
186           ** This simply tries to increase the size of the buffer if it
187           ** becomes full. The resize operation does *not* maintain the
188           ** order of requests, but that should be irrelevant since the
189           ** processing is considered asynchronous anyway.
190           **
191           ** Return if the buffer is currently empty.
192           */
193 
194           static const size_t each =
195               sizeof(blocking_children[0]->workitems[0]);
196 
197           size_t    new_alloc;
198           size_t  slots_used;
199           size_t    sidx;
200 
201           slots_used = c->head_workitem - c->tail_workitem;
202           if (slots_used >= c->workitems_alloc) {
203                     new_alloc  = c->workitems_alloc + WORKITEMS_ALLOC_INC;
204                     c->workitems = erealloc(c->workitems, new_alloc * each);
205                     for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
206                         c->workitems[sidx] = NULL;
207                     c->tail_workitem   = 0;
208                     c->head_workitem   = c->workitems_alloc;
209                     c->workitems_alloc = new_alloc;
210           }
211           INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
212           return (0 == slots_used);
213 }
214 
215 /* --------------------------------------------------------------------
216  * Make sure there is an empty slot at the head of the response
217  * queue. Tell if the queue is currently empty.
218  */
219 static int
ensure_workresp_empty_slot(blocking_child * c)220 ensure_workresp_empty_slot(
221           blocking_child *c
222           )
223 {
224           /*
225           ** !!! PRECONDITION: caller holds access lock!
226           **
227           ** Works like the companion function above.
228           */
229 
230           static const size_t each =
231               sizeof(blocking_children[0]->responses[0]);
232 
233           size_t    new_alloc;
234           size_t  slots_used;
235           size_t    sidx;
236 
237           slots_used = c->head_response - c->tail_response;
238           if (slots_used >= c->responses_alloc) {
239                     new_alloc  = c->responses_alloc + RESPONSES_ALLOC_INC;
240                     c->responses = erealloc(c->responses, new_alloc * each);
241                     for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
242                         c->responses[sidx] = NULL;
243                     c->tail_response   = 0;
244                     c->head_response   = c->responses_alloc;
245                     c->responses_alloc = new_alloc;
246           }
247           INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
248           return (0 == slots_used);
249 }
250 
251 
252 /* --------------------------------------------------------------------
253  * queue_req_pointer() - append a work item or idle exit request to
254  *                             blocking_workitems[]. Employ proper locking.
255  */
256 static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)257 queue_req_pointer(
258           blocking_child      *         c,
259           blocking_pipe_header *        hdr
260           )
261 {
262           size_t qhead;
263 
264           /* >>>> ACCESS LOCKING STARTS >>>> */
265           wait_for_sem(c->accesslock, NULL);
266           ensure_workitems_empty_slot(c);
267           qhead = c->head_workitem;
268           c->workitems[qhead % c->workitems_alloc] = hdr;
269           c->head_workitem = 1 + qhead;
270           tickle_sem(c->accesslock);
271           /* <<<< ACCESS LOCKING ENDS <<<< */
272 
273           /* queue consumer wake-up notification */
274           tickle_sem(c->workitems_pending);
275 
276           return 0;
277 }
278 
279 /* --------------------------------------------------------------------
280  * API function to make sure a worker is running, a proper private copy
281  * of the data is made, the data eneterd into the queue and the worker
282  * is signalled.
283  */
284 int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)285 send_blocking_req_internal(
286           blocking_child *    c,
287           blocking_pipe_header *        hdr,
288           void *                        data
289           )
290 {
291           blocking_pipe_header *        threadcopy;
292           size_t                        payload_octets;
293 
294           REQUIRE(hdr != NULL);
295           REQUIRE(data != NULL);
296           DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
297 
298           if (hdr->octets <= sizeof(*hdr))
299                     return 1; /* failure */
300           payload_octets = hdr->octets - sizeof(*hdr);
301 
302           if (NULL == c->thread_ref)
303                     start_blocking_thread(c);
304           threadcopy = emalloc(hdr->octets);
305           memcpy(threadcopy, hdr, sizeof(*hdr));
306           memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
307 
308           return queue_req_pointer(c, threadcopy);
309 }
310 
311 /* --------------------------------------------------------------------
312  * Wait for the 'incoming queue no longer empty' signal, lock the shared
313  * structure and dequeue an item.
314  */
315 blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)316 receive_blocking_req_internal(
317           blocking_child *    c
318           )
319 {
320           blocking_pipe_header *        req;
321           size_t                        qhead, qtail;
322 
323           req = NULL;
324           do {
325                     /* wait for tickle from the producer side */
326                     wait_for_sem(c->workitems_pending, NULL);
327 
328                     /* >>>> ACCESS LOCKING STARTS >>>> */
329                     wait_for_sem(c->accesslock, NULL);
330                     qhead = c->head_workitem;
331                     do {
332                               qtail = c->tail_workitem;
333                               if (qhead == qtail)
334                                         break;
335                               c->tail_workitem = qtail + 1;
336                               qtail %= c->workitems_alloc;
337                               req = c->workitems[qtail];
338                               c->workitems[qtail] = NULL;
339                     } while (NULL == req);
340                     tickle_sem(c->accesslock);
341                     /* <<<< ACCESS LOCKING ENDS <<<< */
342 
343           } while (NULL == req);
344 
345           INSIST(NULL != req);
346           if (CHILD_EXIT_REQ == req) {  /* idled out */
347                     send_blocking_resp_internal(c, CHILD_GONE_RESP);
348                     req = NULL;
349           }
350 
351           return req;
352 }
353 
354 /* --------------------------------------------------------------------
355  * Push a response into the return queue and eventually tickle the
356  * receiver.
357  */
358 int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)359 send_blocking_resp_internal(
360           blocking_child *    c,
361           blocking_pipe_header *        resp
362           )
363 {
364           size_t    qhead;
365           int       empty;
366 
367           /* >>>> ACCESS LOCKING STARTS >>>> */
368           wait_for_sem(c->accesslock, NULL);
369           empty = ensure_workresp_empty_slot(c);
370           qhead = c->head_response;
371           c->responses[qhead % c->responses_alloc] = resp;
372           c->head_response = 1 + qhead;
373           tickle_sem(c->accesslock);
374           /* <<<< ACCESS LOCKING ENDS <<<< */
375 
376           /* queue consumer wake-up notification */
377           if (empty)
378           {
379 #             ifdef WORK_PIPE
380                     if (1 != write(c->resp_write_pipe, "", 1))
381                               msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo"
382                                         " failed to notify main thread!",
383                                         (BLOCKING_GETNAMEINFO == resp->rtype)
384                                             ? "name"
385                                             : "addr"
386                                         );
387 #             else
388                     tickle_sem(c->responses_pending);
389 #             endif
390           }
391           return 0;
392 }
393 
394 
395 #ifndef WORK_PIPE
396 
397 /* --------------------------------------------------------------------
398  * Check if a (Windows-)handle to a semaphore is actually the same we
399  * are using inside the sema wrapper.
400  */
401 static BOOL
same_os_sema(const sem_ref obj,void * osh)402 same_os_sema(
403           const sem_ref       obj,
404           void*               osh
405           )
406 {
407           return obj && osh && (obj->shnd == (HANDLE)osh);
408 }
409 
410 /* --------------------------------------------------------------------
411  * Find the shared context that associates to an OS handle and make sure
412  * the data is dequeued and processed.
413  */
414 void
handle_blocking_resp_sem(void * context)415 handle_blocking_resp_sem(
416           void *    context
417           )
418 {
419           blocking_child *    c;
420           u_int                         idx;
421 
422           c = NULL;
423           for (idx = 0; idx < blocking_children_alloc; idx++) {
424                     c = blocking_children[idx];
425                     if (c != NULL &&
426                               c->thread_ref != NULL &&
427                               same_os_sema(c->responses_pending, context))
428                               break;
429           }
430           if (idx < blocking_children_alloc)
431                     process_blocking_resp(c);
432 }
433 #endif    /* !WORK_PIPE */
434 
435 /* --------------------------------------------------------------------
436  * Fetch the next response from the return queue. In case of signalling
437  * via pipe, make sure the pipe is flushed, too.
438  */
439 blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)440 receive_blocking_resp_internal(
441           blocking_child *    c
442           )
443 {
444           blocking_pipe_header *        removed;
445           size_t                        qhead, qtail, slot;
446 
447 #ifdef WORK_PIPE
448           int                           rc;
449           char                          scratch[32];
450 
451           do
452                     rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
453           while (-1 == rc && EINTR == errno);
454 #endif
455 
456           /* >>>> ACCESS LOCKING STARTS >>>> */
457           wait_for_sem(c->accesslock, NULL);
458           qhead = c->head_response;
459           qtail = c->tail_response;
460           for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
461                     slot = qtail % c->responses_alloc;
462                     removed = c->responses[slot];
463                     c->responses[slot] = NULL;
464           }
465           c->tail_response = qtail;
466           tickle_sem(c->accesslock);
467           /* <<<< ACCESS LOCKING ENDS <<<< */
468 
469           if (NULL != removed) {
470                     DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
471                                    BLOCKING_RESP_MAGIC == removed->magic_sig);
472           }
473           if (CHILD_GONE_RESP == removed) {
474                     cleanup_after_child(c);
475                     removed = NULL;
476           }
477 
478           return removed;
479 }
480 
481 /* --------------------------------------------------------------------
482  * Light up a new worker.
483  */
484 static void
start_blocking_thread(blocking_child * c)485 start_blocking_thread(
486           blocking_child *    c
487           )
488 {
489 
490           DEBUG_INSIST(!c->reusable);
491 
492           prepare_child_sems(c);
493           start_blocking_thread_internal(c);
494 }
495 
496 /* --------------------------------------------------------------------
497  * Create a worker thread. There are several differences between POSIX
498  * and Windows, of course -- most notably the Windows thread is a
499  * detached thread, and we keep the handle around until we want to get
500  * rid of the thread. The notification scheme also differs: Windows
501  * makes use of semaphores in both directions, POSIX uses a pipe for
502  * integration with 'select()' or alike.
503  */
504 static void
start_blocking_thread_internal(blocking_child * c)505 start_blocking_thread_internal(
506           blocking_child *    c
507           )
508 #ifdef SYS_WINNT
509 {
510           BOOL      resumed;
511 
512           c->thread_ref = NULL;
513           (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
514           c->thr_table[0].thnd =
515                     (HANDLE)_beginthreadex(
516                               NULL,
517                               0,
518                               &blocking_thread,
519                               c,
520                               CREATE_SUSPENDED,
521                               NULL);
522 
523           if (NULL == c->thr_table[0].thnd) {
524                     msyslog(LOG_ERR, "start blocking thread failed: %m");
525                     exit(-1);
526           }
527           /* remember the thread priority is only within the process class */
528           if (!SetThreadPriority(c->thr_table[0].thnd,
529                                      THREAD_PRIORITY_BELOW_NORMAL)) {
530                     msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
531           }
532           if (NULL != pSetThreadDescription) {
533                     (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
534           }
535           resumed = ResumeThread(c->thr_table[0].thnd);
536           DEBUG_INSIST(resumed);
537           c->thread_ref = &c->thr_table[0];
538 }
539 #else     /* pthreads start_blocking_thread_internal() follows */
540 {
541 # ifdef NEED_PTHREAD_INIT
542           static int          pthread_init_called;
543 # endif
544           pthread_attr_t      thr_attr;
545           int                 rc;
546           int                 pipe_ends[2];       /* read then write */
547           int                 is_pipe;
548           int                 flags;
549           size_t              ostacksize;
550           size_t              nstacksize;
551           sigset_t  saved_sig_mask;
552 
553           c->thread_ref = NULL;
554 
555 # ifdef NEED_PTHREAD_INIT
556           /*
557            * from lib/isc/unix/app.c:
558            * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
559            */
560           if (!pthread_init_called) {
561                     pthread_init();
562                     pthread_init_called = TRUE;
563           }
564 # endif
565 
566           rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
567           if (0 != rc) {
568                     msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
569                     exit(1);
570           }
571           c->resp_read_pipe = move_fd(pipe_ends[0]);
572           c->resp_write_pipe = move_fd(pipe_ends[1]);
573           c->ispipe = is_pipe;
574           flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
575           if (-1 == flags) {
576                     msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
577                     exit(1);
578           }
579           rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
580           if (-1 == rc) {
581                     msyslog(LOG_ERR,
582                               "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
583                     exit(1);
584           }
585           (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
586           pthread_attr_init(&thr_attr);
587           pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
588 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
589     defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
590           rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
591           if (0 != rc) {
592                     msyslog(LOG_ERR,
593                               "start_blocking_thread: pthread_attr_getstacksize() -> %s",
594                               strerror(rc));
595           } else {
596                     nstacksize = ostacksize;
597                     /* order is important here: first clamp on upper limit,
598                      * and the PTHREAD min stack size is ultimate override!
599                      */
600                     if (nstacksize > THREAD_MAXSTACKSIZE)
601                               nstacksize = THREAD_MAXSTACKSIZE;
602 #            ifdef PTHREAD_STACK_MAX
603                     if (nstacksize > PTHREAD_STACK_MAX)
604                               nstacksize = PTHREAD_STACK_MAX;
605 #            endif
606 
607                     /* now clamp on lower stack limit. */
608                     if (nstacksize < THREAD_MINSTACKSIZE)
609                               nstacksize = THREAD_MINSTACKSIZE;
610 #            ifdef PTHREAD_STACK_MIN
611                     if (nstacksize < PTHREAD_STACK_MIN)
612                               nstacksize = PTHREAD_STACK_MIN;
613 #            endif
614 
615                     if (nstacksize != ostacksize)
616                               rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
617                     if (0 != rc)
618                               msyslog(LOG_ERR,
619                                         "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
620                                         (u_long)ostacksize, (u_long)nstacksize,
621                                         strerror(rc));
622           }
623 #else
624           UNUSED_ARG(nstacksize);
625           UNUSED_ARG(ostacksize);
626 #endif
627 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
628           pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
629 #endif
630           c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
631           block_thread_signals(&saved_sig_mask);
632           rc = pthread_create(&c->thr_table[0], &thr_attr,
633                                   &blocking_thread, c);
634           pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
635           pthread_attr_destroy(&thr_attr);
636           if (0 != rc) {
637                     msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
638                               strerror(rc));
639                     exit(1);
640           }
641           c->thread_ref = &c->thr_table[0];
642 }
643 #endif
644 
645 /* --------------------------------------------------------------------
646  * block_thread_signals()
647  *
648  * Temporarily block signals used by ntpd main thread, so that signal
649  * mask inherited by child threads leaves them blocked.  Returns prior
650  * active signal mask via pmask, to be restored by the main thread
651  * after pthread_create().
652  */
653 #ifndef SYS_WINNT
654 void
block_thread_signals(sigset_t * pmask)655 block_thread_signals(
656           sigset_t *          pmask
657           )
658 {
659           sigset_t  block;
660 
661           sigemptyset(&block);
662 # ifdef HAVE_SIGNALED_IO
663 #  ifdef SIGIO
664           sigaddset(&block, SIGIO);
665 #  endif
666 #  ifdef SIGPOLL
667           sigaddset(&block, SIGPOLL);
668 #  endif
669 # endif   /* HAVE_SIGNALED_IO */
670           sigaddset(&block, SIGALRM);
671           sigaddset(&block, MOREDEBUGSIG);
672           sigaddset(&block, LESSDEBUGSIG);
673 # ifdef SIGDIE1
674           sigaddset(&block, SIGDIE1);
675 # endif
676 # ifdef SIGDIE2
677           sigaddset(&block, SIGDIE2);
678 # endif
679 # ifdef SIGDIE3
680           sigaddset(&block, SIGDIE3);
681 # endif
682 # ifdef SIGDIE4
683           sigaddset(&block, SIGDIE4);
684 # endif
685 # ifdef SIGBUS
686           sigaddset(&block, SIGBUS);
687 # endif
688           sigemptyset(pmask);
689           pthread_sigmask(SIG_BLOCK, &block, pmask);
690 }
691 #endif    /* !SYS_WINNT */
692 
693 
694 /* --------------------------------------------------------------------
695  * Create & destroy semaphores. This is sufficiently different between
696  * POSIX and Windows to warrant wrapper functions and close enough to
697  * use the concept of synchronization via semaphore for all platforms.
698  */
699 static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)700 create_sema(
701           sema_type*          semptr,
702           u_int               inival,
703           u_int               maxval)
704 {
705 #ifdef SYS_WINNT
706 
707           long svini, svmax;
708           if (NULL != semptr) {
709                     svini = (inival < LONG_MAX)
710                         ? (long)inival : LONG_MAX;
711                     svmax = (maxval < LONG_MAX && maxval > 0)
712                         ? (long)maxval : LONG_MAX;
713                     semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
714                     if (NULL == semptr->shnd)
715                               semptr = NULL;
716           }
717 
718 #else
719 
720           (void)maxval;
721           if (semptr && sem_init(semptr, FALSE, inival))
722                     semptr = NULL;
723 
724 #endif
725 
726           return semptr;
727 }
728 
729 /* ------------------------------------------------------------------ */
730 static sem_ref
delete_sema(sem_ref obj)731 delete_sema(
732           sem_ref obj)
733 {
734 
735 #   ifdef SYS_WINNT
736 
737           if (obj) {
738                     if (obj->shnd)
739                               CloseHandle(obj->shnd);
740                     obj->shnd = NULL;
741           }
742 
743 #   else
744 
745           if (obj)
746                     sem_destroy(obj);
747 
748 #   endif
749 
750           return NULL;
751 }
752 
753 /* --------------------------------------------------------------------
754  * prepare_child_sems()
755  *
756  * create sync & access semaphores
757  *
758  * All semaphores are cleared, only the access semaphore has 1 unit.
759  * Childs wait on 'workitems_pending', then grabs 'sema_access'
760  * and dequeues jobs. When done, 'sema_access' is given one unit back.
761  *
762  * The producer grabs 'sema_access', manages the queue, restores
763  * 'sema_access' and puts one unit into 'workitems_pending'.
764  *
765  * The story goes the same for the response queue.
766  */
767 static void
prepare_child_sems(blocking_child * c)768 prepare_child_sems(
769           blocking_child *c
770           )
771 {
772           if (NULL == worker_memlock)
773                     worker_memlock = create_sema(&worker_mmutex, 1, 1);
774 
775           c->accesslock           = create_sema(&c->sem_table[0], 1, 1);
776           c->workitems_pending    = create_sema(&c->sem_table[1], 0, 0);
777           c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
778 #   ifndef WORK_PIPE
779           c->responses_pending    = create_sema(&c->sem_table[3], 0, 0);
780 #   endif
781 }
782 
783 /* --------------------------------------------------------------------
784  * wait for semaphore. Where the wait can be interrupted, it will
785  * internally resume -- When this function returns, there is either no
786  * semaphore at all, a timeout occurred, or the caller could
787  * successfully take a token from the semaphore.
788  *
789  * For untimed wait, not checking the result of this function at all is
790  * definitely an option.
791  */
792 static int
wait_for_sem(sem_ref sem,struct timespec * timeout)793 wait_for_sem(
794           sem_ref                       sem,
795           struct timespec *   timeout             /* wall-clock */
796           )
797 #ifdef SYS_WINNT
798 {
799           struct timespec now;
800           struct timespec delta;
801           DWORD               msec;
802           DWORD               rc;
803 
804           if (!(sem && sem->shnd)) {
805                     errno = EINVAL;
806                     return -1;
807           }
808 
809           if (NULL == timeout) {
810                     msec = INFINITE;
811           } else {
812                     getclock(TIMEOFDAY, &now);
813                     delta = sub_tspec(*timeout, now);
814                     if (delta.tv_sec < 0) {
815                               msec = 0;
816                     } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
817                               msec = INFINITE;
818                     } else {
819                               msec = 1000 * (DWORD)delta.tv_sec;
820                               msec += delta.tv_nsec / (1000 * 1000);
821                     }
822           }
823           rc = WaitForSingleObject(sem->shnd, msec);
824           if (WAIT_OBJECT_0 == rc)
825                     return 0;
826           if (WAIT_TIMEOUT == rc) {
827                     errno = ETIMEDOUT;
828                     return -1;
829           }
830           msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
831           errno = EFAULT;
832           return -1;
833 }
834 #else     /* pthreads wait_for_sem() follows */
835 {
836           int rc = -1;
837 
838           if (sem) do {
839                               if (NULL == timeout)
840                                         rc = sem_wait(sem);
841                               else
842                                         rc = sem_timedwait(sem, timeout);
843                     } while (rc == -1 && errno == EINTR);
844           else
845                     errno = EINVAL;
846 
847           return rc;
848 }
849 #endif
850 
851 /* --------------------------------------------------------------------
852  * blocking_thread - thread functions have WINAPI (aka 'stdcall')
853  * calling conventions under Windows and POSIX-defined signature
854  * otherwise.
855  */
856 #ifdef SYS_WINNT
857 u_int WINAPI
858 #else
859 void *
860 #endif
blocking_thread(void * ThreadArg)861 blocking_thread(
862           void *    ThreadArg
863           )
864 {
865           blocking_child *c;
866 
867           c = ThreadArg;
868           exit_worker(blocking_child_common(c));
869 
870           /* NOTREACHED */
871           return 0;
872 }
873 
874 /* --------------------------------------------------------------------
875  * req_child_exit() runs in the parent.
876  *
877  * This function is called from from the idle timer, too, and possibly
878  * without a thread being there any longer. Since we have folded up our
879  * tent in that case and all the semaphores are already gone, we simply
880  * ignore this request in this case.
881  *
882  * Since the existence of the semaphores is controlled exclusively by
883  * the parent, there's no risk of data race here.
884  */
885 int
req_child_exit(blocking_child * c)886 req_child_exit(
887           blocking_child *c
888           )
889 {
890           return (c->accesslock)
891               ? queue_req_pointer(c, CHILD_EXIT_REQ)
892               : 0;
893 }
894 
895 /* --------------------------------------------------------------------
896  * cleanup_after_child() runs in parent.
897  */
898 static void
cleanup_after_child(blocking_child * c)899 cleanup_after_child(
900           blocking_child *    c
901           )
902 {
903           DEBUG_INSIST(!c->reusable);
904 
905 #   ifdef SYS_WINNT
906           /* The thread was not created in detached state, so we better
907            * clean up.
908            */
909           if (c->thread_ref && c->thread_ref->thnd) {
910                     WaitForSingleObject(c->thread_ref->thnd, INFINITE);
911                     INSIST(CloseHandle(c->thread_ref->thnd));
912                     c->thread_ref->thnd = NULL;
913           }
914 #   endif
915           c->thread_ref = NULL;
916 
917           /* remove semaphores and (if signalling vi IO) pipes */
918 
919           c->accesslock           = delete_sema(c->accesslock);
920           c->workitems_pending    = delete_sema(c->workitems_pending);
921           c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
922 
923 #   ifdef WORK_PIPE
924           DEBUG_INSIST(-1 != c->resp_read_pipe);
925           DEBUG_INSIST(-1 != c->resp_write_pipe);
926           (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
927           close(c->resp_write_pipe);
928           close(c->resp_read_pipe);
929           c->resp_write_pipe = -1;
930           c->resp_read_pipe = -1;
931 #   else
932           DEBUG_INSIST(NULL != c->responses_pending);
933           (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
934           c->responses_pending = delete_sema(c->responses_pending);
935 #   endif
936 
937           /* Is it necessary to check if there are pending requests and
938            * responses? If so, and if there are, what to do with them?
939            */
940 
941           /* re-init buffer index sequencers */
942           c->head_workitem = 0;
943           c->tail_workitem = 0;
944           c->head_response = 0;
945           c->tail_response = 0;
946 
947           c->reusable = TRUE;
948 }
949 
950 
951 #else     /* !WORK_THREAD follows */
952 char work_thread_nonempty_compilation_unit;
953 #endif
954