1 /*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6
7 #ifdef WORK_THREAD
8
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25
26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP CHILD_EXIT_REQ
28 #define WORKITEMS_ALLOC_INC 16
29 #define RESPONSES_ALLOC_INC 4
30
31 #ifndef THREAD_MINSTACKSIZE
32 #define THREAD_MINSTACKSIZE (64U * 1024)
33 #endif
34
35 #ifdef SYS_WINNT
36
37 # define thread_exit(c) _endthreadex(c)
38 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
39 u_int WINAPI blocking_thread(void *);
40 static BOOL same_os_sema(const sem_ref obj, void * osobj);
41
42 #else
43
44 # define thread_exit(c) pthread_exit((void*)(size_t)(c))
45 # define tickle_sem sem_post
46 void * blocking_thread(void *);
47 static void block_thread_signals(sigset_t *);
48
49 #endif
50
51 #ifdef WORK_PIPE
52 addremove_io_fd_func addremove_io_fd;
53 #else
54 addremove_io_semaphore_func addremove_io_semaphore;
55 #endif
56
57 static void start_blocking_thread(blocking_child *);
58 static void start_blocking_thread_internal(blocking_child *);
59 static void prepare_child_sems(blocking_child *);
60 static int wait_for_sem(sem_ref, struct timespec *);
61 static int ensure_workitems_empty_slot(blocking_child *);
62 static int ensure_workresp_empty_slot(blocking_child *);
63 static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
64 static void cleanup_after_child(blocking_child *);
65
66
67 void
exit_worker(int exitcode)68 exit_worker(
69 int exitcode
70 )
71 {
72 thread_exit(exitcode); /* see #define thread_exit */
73 }
74
75 /* --------------------------------------------------------------------
76 * sleep for a given time or until the wakup semaphore is tickled.
77 */
78 int
worker_sleep(blocking_child * c,time_t seconds)79 worker_sleep(
80 blocking_child * c,
81 time_t seconds
82 )
83 {
84 struct timespec until;
85 int rc;
86
87 # ifdef HAVE_CLOCK_GETTIME
88 if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
89 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
90 return -1;
91 }
92 # else
93 if (0 != getclock(TIMEOFDAY, &until)) {
94 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
95 return -1;
96 }
97 # endif
98 until.tv_sec += seconds;
99 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
100 if (0 == rc)
101 return -1;
102 if (-1 == rc && ETIMEDOUT == errno)
103 return 0;
104 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
105 return -1;
106 }
107
108
109 /* --------------------------------------------------------------------
110 * Wake up a worker that takes a nap.
111 */
112 void
interrupt_worker_sleep(void)113 interrupt_worker_sleep(void)
114 {
115 u_int idx;
116 blocking_child * c;
117
118 for (idx = 0; idx < blocking_children_alloc; idx++) {
119 c = blocking_children[idx];
120 if (NULL == c || NULL == c->wake_scheduled_sleep)
121 continue;
122 tickle_sem(c->wake_scheduled_sleep);
123 }
124 }
125
126 /* --------------------------------------------------------------------
127 * Make sure there is an empty slot at the head of the request
128 * queue. Tell if the queue is currently empty.
129 */
130 static int
ensure_workitems_empty_slot(blocking_child * c)131 ensure_workitems_empty_slot(
132 blocking_child *c
133 )
134 {
135 /*
136 ** !!! PRECONDITION: caller holds access lock!
137 **
138 ** This simply tries to increase the size of the buffer if it
139 ** becomes full. The resize operation does *not* maintain the
140 ** order of requests, but that should be irrelevant since the
141 ** processing is considered asynchronous anyway.
142 **
143 ** Return if the buffer is currently empty.
144 */
145
146 static const size_t each =
147 sizeof(blocking_children[0]->workitems[0]);
148
149 size_t new_alloc;
150 size_t slots_used;
151
152 slots_used = c->head_workitem - c->tail_workitem;
153 if (slots_used >= c->workitems_alloc) {
154 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
155 c->workitems = erealloc(c->workitems, new_alloc * each);
156 c->tail_workitem = 0;
157 c->head_workitem = c->workitems_alloc;
158 c->workitems_alloc = new_alloc;
159 }
160 return (0 == slots_used);
161 }
162
163 /* --------------------------------------------------------------------
164 * Make sure there is an empty slot at the head of the response
165 * queue. Tell if the queue is currently empty.
166 */
167 static int
ensure_workresp_empty_slot(blocking_child * c)168 ensure_workresp_empty_slot(
169 blocking_child *c
170 )
171 {
172 /*
173 ** !!! PRECONDITION: caller holds access lock!
174 **
175 ** Works like the companion function above.
176 */
177
178 static const size_t each =
179 sizeof(blocking_children[0]->responses[0]);
180
181 size_t new_alloc;
182 size_t slots_used;
183
184 slots_used = c->head_response - c->tail_response;
185 if (slots_used >= c->responses_alloc) {
186 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
187 c->responses = erealloc(c->responses, new_alloc * each);
188 c->tail_response = 0;
189 c->head_response = c->responses_alloc;
190 c->responses_alloc = new_alloc;
191 }
192 return (0 == slots_used);
193 }
194
195
196 /* --------------------------------------------------------------------
197 * queue_req_pointer() - append a work item or idle exit request to
198 * blocking_workitems[]. Employ proper locking.
199 */
200 static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)201 queue_req_pointer(
202 blocking_child * c,
203 blocking_pipe_header * hdr
204 )
205 {
206 size_t qhead;
207
208 /* >>>> ACCESS LOCKING STARTS >>>> */
209 wait_for_sem(c->accesslock, NULL);
210 ensure_workitems_empty_slot(c);
211 qhead = c->head_workitem;
212 c->workitems[qhead % c->workitems_alloc] = hdr;
213 c->head_workitem = 1 + qhead;
214 tickle_sem(c->accesslock);
215 /* <<<< ACCESS LOCKING ENDS <<<< */
216
217 /* queue consumer wake-up notification */
218 tickle_sem(c->workitems_pending);
219
220 return 0;
221 }
222
223 /* --------------------------------------------------------------------
224 * API function to make sure a worker is running, a proper private copy
225 * of the data is made, the data eneterd into the queue and the worker
226 * is signalled.
227 */
228 int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)229 send_blocking_req_internal(
230 blocking_child * c,
231 blocking_pipe_header * hdr,
232 void * data
233 )
234 {
235 blocking_pipe_header * threadcopy;
236 size_t payload_octets;
237
238 REQUIRE(hdr != NULL);
239 REQUIRE(data != NULL);
240 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
241
242 if (hdr->octets <= sizeof(*hdr))
243 return 1; /* failure */
244 payload_octets = hdr->octets - sizeof(*hdr);
245
246 if (NULL == c->thread_ref)
247 start_blocking_thread(c);
248 threadcopy = emalloc(hdr->octets);
249 memcpy(threadcopy, hdr, sizeof(*hdr));
250 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
251
252 return queue_req_pointer(c, threadcopy);
253 }
254
255 /* --------------------------------------------------------------------
256 * Wait for the 'incoming queue no longer empty' signal, lock the shared
257 * structure and dequeue an item.
258 */
259 blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)260 receive_blocking_req_internal(
261 blocking_child * c
262 )
263 {
264 blocking_pipe_header * req;
265 size_t qhead, qtail;
266
267 req = NULL;
268 do {
269 /* wait for tickle from the producer side */
270 wait_for_sem(c->workitems_pending, NULL);
271
272 /* >>>> ACCESS LOCKING STARTS >>>> */
273 wait_for_sem(c->accesslock, NULL);
274 qhead = c->head_workitem;
275 do {
276 qtail = c->tail_workitem;
277 if (qhead == qtail)
278 break;
279 c->tail_workitem = qtail + 1;
280 qtail %= c->workitems_alloc;
281 req = c->workitems[qtail];
282 c->workitems[qtail] = NULL;
283 } while (NULL == req);
284 tickle_sem(c->accesslock);
285 /* <<<< ACCESS LOCKING ENDS <<<< */
286
287 } while (NULL == req);
288
289 INSIST(NULL != req);
290 if (CHILD_EXIT_REQ == req) { /* idled out */
291 send_blocking_resp_internal(c, CHILD_GONE_RESP);
292 req = NULL;
293 }
294
295 return req;
296 }
297
298 /* --------------------------------------------------------------------
299 * Push a response into the return queue and eventually tickle the
300 * receiver.
301 */
302 int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)303 send_blocking_resp_internal(
304 blocking_child * c,
305 blocking_pipe_header * resp
306 )
307 {
308 size_t qhead;
309 int empty;
310
311 /* >>>> ACCESS LOCKING STARTS >>>> */
312 wait_for_sem(c->accesslock, NULL);
313 empty = ensure_workresp_empty_slot(c);
314 qhead = c->head_response;
315 c->responses[qhead % c->responses_alloc] = resp;
316 c->head_response = 1 + qhead;
317 tickle_sem(c->accesslock);
318 /* <<<< ACCESS LOCKING ENDS <<<< */
319
320 /* queue consumer wake-up notification */
321 if (empty)
322 {
323 # ifdef WORK_PIPE
324 write(c->resp_write_pipe, "", 1);
325 # else
326 tickle_sem(c->responses_pending);
327 # endif
328 }
329 return 0;
330 }
331
332
333 #ifndef WORK_PIPE
334
335 /* --------------------------------------------------------------------
336 * Check if a (Windows-)hanndle to a semaphore is actually the same we
337 * are using inside the sema wrapper.
338 */
339 static BOOL
same_os_sema(const sem_ref obj,void * osh)340 same_os_sema(
341 const sem_ref obj,
342 void* osh
343 )
344 {
345 return obj && osh && (obj->shnd == (HANDLE)osh);
346 }
347
348 /* --------------------------------------------------------------------
349 * Find the shared context that associates to an OS handle and make sure
350 * the data is dequeued and processed.
351 */
352 void
handle_blocking_resp_sem(void * context)353 handle_blocking_resp_sem(
354 void * context
355 )
356 {
357 blocking_child * c;
358 u_int idx;
359
360 c = NULL;
361 for (idx = 0; idx < blocking_children_alloc; idx++) {
362 c = blocking_children[idx];
363 if (c != NULL &&
364 c->thread_ref != NULL &&
365 same_os_sema(c->responses_pending, context))
366 break;
367 }
368 if (idx < blocking_children_alloc)
369 process_blocking_resp(c);
370 }
371 #endif /* !WORK_PIPE */
372
373 /* --------------------------------------------------------------------
374 * Fetch the next response from the return queue. In case of signalling
375 * via pipe, make sure the pipe is flushed, too.
376 */
377 blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)378 receive_blocking_resp_internal(
379 blocking_child * c
380 )
381 {
382 blocking_pipe_header * removed;
383 size_t qhead, qtail, slot;
384
385 #ifdef WORK_PIPE
386 int rc;
387 char scratch[32];
388
389 do
390 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
391 while (-1 == rc && EINTR == errno);
392 #endif
393
394 /* >>>> ACCESS LOCKING STARTS >>>> */
395 wait_for_sem(c->accesslock, NULL);
396 qhead = c->head_response;
397 qtail = c->tail_response;
398 for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
399 slot = qtail % c->responses_alloc;
400 removed = c->responses[slot];
401 c->responses[slot] = NULL;
402 }
403 c->tail_response = qtail;
404 tickle_sem(c->accesslock);
405 /* <<<< ACCESS LOCKING ENDS <<<< */
406
407 if (NULL != removed) {
408 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
409 BLOCKING_RESP_MAGIC == removed->magic_sig);
410 }
411 if (CHILD_GONE_RESP == removed) {
412 cleanup_after_child(c);
413 removed = NULL;
414 }
415
416 return removed;
417 }
418
419 /* --------------------------------------------------------------------
420 * Light up a new worker.
421 */
422 static void
start_blocking_thread(blocking_child * c)423 start_blocking_thread(
424 blocking_child * c
425 )
426 {
427
428 DEBUG_INSIST(!c->reusable);
429
430 prepare_child_sems(c);
431 start_blocking_thread_internal(c);
432 }
433
434 /* --------------------------------------------------------------------
435 * Create a worker thread. There are several differences between POSIX
436 * and Windows, of course -- most notably the Windows thread is no
437 * detached thread, and we keep the handle around until we want to get
438 * rid of the thread. The notification scheme also differs: Windows
439 * makes use of semaphores in both directions, POSIX uses a pipe for
440 * integration with 'select()' or alike.
441 */
442 static void
start_blocking_thread_internal(blocking_child * c)443 start_blocking_thread_internal(
444 blocking_child * c
445 )
446 #ifdef SYS_WINNT
447 {
448 BOOL resumed;
449
450 c->thread_ref = NULL;
451 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
452 c->thr_table[0].thnd =
453 (HANDLE)_beginthreadex(
454 NULL,
455 0,
456 &blocking_thread,
457 c,
458 CREATE_SUSPENDED,
459 NULL);
460
461 if (NULL == c->thr_table[0].thnd) {
462 msyslog(LOG_ERR, "start blocking thread failed: %m");
463 exit(-1);
464 }
465 /* remember the thread priority is only within the process class */
466 if (!SetThreadPriority(c->thr_table[0].thnd,
467 THREAD_PRIORITY_BELOW_NORMAL))
468 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
469
470 resumed = ResumeThread(c->thr_table[0].thnd);
471 DEBUG_INSIST(resumed);
472 c->thread_ref = &c->thr_table[0];
473 }
474 #else /* pthreads start_blocking_thread_internal() follows */
475 {
476 # ifdef NEED_PTHREAD_INIT
477 static int pthread_init_called;
478 # endif
479 pthread_attr_t thr_attr;
480 int rc;
481 int saved_errno;
482 int pipe_ends[2]; /* read then write */
483 int is_pipe;
484 int flags;
485 size_t stacksize;
486 sigset_t saved_sig_mask;
487
488 c->thread_ref = NULL;
489
490 # ifdef NEED_PTHREAD_INIT
491 /*
492 * from lib/isc/unix/app.c:
493 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
494 */
495 if (!pthread_init_called) {
496 pthread_init();
497 pthread_init_called = TRUE;
498 }
499 # endif
500
501 rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
502 if (0 != rc) {
503 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
504 exit(1);
505 }
506 c->resp_read_pipe = move_fd(pipe_ends[0]);
507 c->resp_write_pipe = move_fd(pipe_ends[1]);
508 c->ispipe = is_pipe;
509 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
510 if (-1 == flags) {
511 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
512 exit(1);
513 }
514 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
515 if (-1 == rc) {
516 msyslog(LOG_ERR,
517 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
518 exit(1);
519 }
520 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
521 pthread_attr_init(&thr_attr);
522 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
523 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
524 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
525 rc = pthread_attr_getstacksize(&thr_attr, &stacksize);
526 if (-1 == rc) {
527 msyslog(LOG_ERR,
528 "start_blocking_thread: pthread_attr_getstacksize %m");
529 } else if (stacksize < THREAD_MINSTACKSIZE) {
530 rc = pthread_attr_setstacksize(&thr_attr,
531 THREAD_MINSTACKSIZE);
532 if (-1 == rc)
533 msyslog(LOG_ERR,
534 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) %m",
535 (u_long)stacksize,
536 (u_long)THREAD_MINSTACKSIZE);
537 }
538 #else
539 UNUSED_ARG(stacksize);
540 #endif
541 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
542 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
543 #endif
544 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
545 block_thread_signals(&saved_sig_mask);
546 rc = pthread_create(&c->thr_table[0], &thr_attr,
547 &blocking_thread, c);
548 saved_errno = errno;
549 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
550 pthread_attr_destroy(&thr_attr);
551 if (0 != rc) {
552 errno = saved_errno;
553 msyslog(LOG_ERR, "pthread_create() blocking child: %m");
554 exit(1);
555 }
556 c->thread_ref = &c->thr_table[0];
557 }
558 #endif
559
560 /* --------------------------------------------------------------------
561 * block_thread_signals()
562 *
563 * Temporarily block signals used by ntpd main thread, so that signal
564 * mask inherited by child threads leaves them blocked. Returns prior
565 * active signal mask via pmask, to be restored by the main thread
566 * after pthread_create().
567 */
568 #ifndef SYS_WINNT
569 void
block_thread_signals(sigset_t * pmask)570 block_thread_signals(
571 sigset_t * pmask
572 )
573 {
574 sigset_t block;
575
576 sigemptyset(&block);
577 # ifdef HAVE_SIGNALED_IO
578 # ifdef SIGIO
579 sigaddset(&block, SIGIO);
580 # endif
581 # ifdef SIGPOLL
582 sigaddset(&block, SIGPOLL);
583 # endif
584 # endif /* HAVE_SIGNALED_IO */
585 sigaddset(&block, SIGALRM);
586 sigaddset(&block, MOREDEBUGSIG);
587 sigaddset(&block, LESSDEBUGSIG);
588 # ifdef SIGDIE1
589 sigaddset(&block, SIGDIE1);
590 # endif
591 # ifdef SIGDIE2
592 sigaddset(&block, SIGDIE2);
593 # endif
594 # ifdef SIGDIE3
595 sigaddset(&block, SIGDIE3);
596 # endif
597 # ifdef SIGDIE4
598 sigaddset(&block, SIGDIE4);
599 # endif
600 # ifdef SIGBUS
601 sigaddset(&block, SIGBUS);
602 # endif
603 sigemptyset(pmask);
604 pthread_sigmask(SIG_BLOCK, &block, pmask);
605 }
606 #endif /* !SYS_WINNT */
607
608
609 /* --------------------------------------------------------------------
610 * Create & destroy semaphores. This is sufficiently different between
611 * POSIX and Windows to warrant wrapper functions and close enough to
612 * use the concept of synchronization via semaphore for all platforms.
613 */
614 static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)615 create_sema(
616 sema_type* semptr,
617 u_int inival,
618 u_int maxval)
619 {
620 #ifdef SYS_WINNT
621
622 long svini, svmax;
623 if (NULL != semptr) {
624 svini = (inival < LONG_MAX)
625 ? (long)inival : LONG_MAX;
626 svmax = (maxval < LONG_MAX && maxval > 0)
627 ? (long)maxval : LONG_MAX;
628 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
629 if (NULL == semptr->shnd)
630 semptr = NULL;
631 }
632
633 #else
634
635 (void)maxval;
636 if (semptr && sem_init(semptr, FALSE, inival))
637 semptr = NULL;
638
639 #endif
640
641 return semptr;
642 }
643
644 /* ------------------------------------------------------------------ */
645 static sem_ref
delete_sema(sem_ref obj)646 delete_sema(
647 sem_ref obj)
648 {
649
650 # ifdef SYS_WINNT
651
652 if (obj) {
653 if (obj->shnd)
654 CloseHandle(obj->shnd);
655 obj->shnd = NULL;
656 }
657
658 # else
659
660 if (obj)
661 sem_destroy(obj);
662
663 # endif
664
665 return NULL;
666 }
667
668 /* --------------------------------------------------------------------
669 * prepare_child_sems()
670 *
671 * create sync & access semaphores
672 *
673 * All semaphores are cleared, only the access semaphore has 1 unit.
674 * Childs wait on 'workitems_pending', then grabs 'sema_access'
675 * and dequeues jobs. When done, 'sema_access' is given one unit back.
676 *
677 * The producer grabs 'sema_access', manages the queue, restores
678 * 'sema_access' and puts one unit into 'workitems_pending'.
679 *
680 * The story goes the same for the response queue.
681 */
682 static void
prepare_child_sems(blocking_child * c)683 prepare_child_sems(
684 blocking_child *c
685 )
686 {
687 c->accesslock = create_sema(&c->sem_table[0], 1, 1);
688 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
689 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
690 # ifndef WORK_PIPE
691 c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
692 # endif
693 }
694
695 /* --------------------------------------------------------------------
696 * wait for semaphore. Where the wait can be interrupted, it will
697 * internally resume -- When this function returns, there is either no
698 * semaphore at all, a timeout occurred, or the caller could
699 * successfully take a token from the semaphore.
700 *
701 * For untimed wait, not checking the result of this function at all is
702 * definitely an option.
703 */
704 static int
wait_for_sem(sem_ref sem,struct timespec * timeout)705 wait_for_sem(
706 sem_ref sem,
707 struct timespec * timeout /* wall-clock */
708 )
709 #ifdef SYS_WINNT
710 {
711 struct timespec now;
712 struct timespec delta;
713 DWORD msec;
714 DWORD rc;
715
716 if (!(sem && sem->shnd)) {
717 errno = EINVAL;
718 return -1;
719 }
720
721 if (NULL == timeout) {
722 msec = INFINITE;
723 } else {
724 getclock(TIMEOFDAY, &now);
725 delta = sub_tspec(*timeout, now);
726 if (delta.tv_sec < 0) {
727 msec = 0;
728 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
729 msec = INFINITE;
730 } else {
731 msec = 1000 * (DWORD)delta.tv_sec;
732 msec += delta.tv_nsec / (1000 * 1000);
733 }
734 }
735 rc = WaitForSingleObject(sem->shnd, msec);
736 if (WAIT_OBJECT_0 == rc)
737 return 0;
738 if (WAIT_TIMEOUT == rc) {
739 errno = ETIMEDOUT;
740 return -1;
741 }
742 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
743 errno = EFAULT;
744 return -1;
745 }
746 #else /* pthreads wait_for_sem() follows */
747 {
748 int rc = -1;
749
750 if (sem) do {
751 if (NULL == timeout)
752 rc = sem_wait(sem);
753 else
754 rc = sem_timedwait(sem, timeout);
755 } while (rc == -1 && errno == EINTR);
756 else
757 errno = EINVAL;
758
759 return rc;
760 }
761 #endif
762
763 /* --------------------------------------------------------------------
764 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
765 * calling conventions under Windows and POSIX-defined signature
766 * otherwise.
767 */
768 #ifdef SYS_WINNT
769 u_int WINAPI
770 #else
771 void *
772 #endif
blocking_thread(void * ThreadArg)773 blocking_thread(
774 void * ThreadArg
775 )
776 {
777 blocking_child *c;
778
779 c = ThreadArg;
780 exit_worker(blocking_child_common(c));
781
782 /* NOTREACHED */
783 return 0;
784 }
785
786 /* --------------------------------------------------------------------
787 * req_child_exit() runs in the parent.
788 *
789 * This function is called from from the idle timer, too, and possibly
790 * without a thread being there any longer. Since we have folded up our
791 * tent in that case and all the semaphores are already gone, we simply
792 * ignore this request in this case.
793 *
794 * Since the existence of the semaphores is controlled exclusively by
795 * the parent, there's no risk of data race here.
796 */
797 int
req_child_exit(blocking_child * c)798 req_child_exit(
799 blocking_child *c
800 )
801 {
802 return (c->accesslock)
803 ? queue_req_pointer(c, CHILD_EXIT_REQ)
804 : 0;
805 }
806
807 /* --------------------------------------------------------------------
808 * cleanup_after_child() runs in parent.
809 */
810 static void
cleanup_after_child(blocking_child * c)811 cleanup_after_child(
812 blocking_child * c
813 )
814 {
815 DEBUG_INSIST(!c->reusable);
816
817 # ifdef SYS_WINNT
818 /* The thread was not created in detached state, so we better
819 * clean up.
820 */
821 if (c->thread_ref && c->thread_ref->thnd) {
822 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
823 INSIST(CloseHandle(c->thread_ref->thnd));
824 c->thread_ref->thnd = NULL;
825 }
826 # endif
827 c->thread_ref = NULL;
828
829 /* remove semaphores and (if signalling vi IO) pipes */
830
831 c->accesslock = delete_sema(c->accesslock);
832 c->workitems_pending = delete_sema(c->workitems_pending);
833 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
834
835 # ifdef WORK_PIPE
836 DEBUG_INSIST(-1 != c->resp_read_pipe);
837 DEBUG_INSIST(-1 != c->resp_write_pipe);
838 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
839 close(c->resp_write_pipe);
840 close(c->resp_read_pipe);
841 c->resp_write_pipe = -1;
842 c->resp_read_pipe = -1;
843 # else
844 DEBUG_INSIST(NULL != c->responses_pending);
845 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
846 c->responses_pending = delete_sema(c->responses_pending);
847 # endif
848
849 /* Is it necessary to check if there are pending requests and
850 * responses? If so, and if there are, what to do with them?
851 */
852
853 /* re-init buffer index sequencers */
854 c->head_workitem = 0;
855 c->tail_workitem = 0;
856 c->head_response = 0;
857 c->tail_response = 0;
858
859 c->reusable = TRUE;
860 }
861
862
863 #else /* !WORK_THREAD follows */
864 char work_thread_nonempty_compilation_unit;
865 #endif
866