1 /*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6
7 #ifdef WORK_THREAD
8
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25
26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP CHILD_EXIT_REQ
28 /* Queue size increments:
29 * The request queue grows a bit faster than the response queue -- the
30 * deamon can push requests and pull results faster on avarage than the
31 * worker can process requests and push results... If this really pays
32 * off is debatable.
33 */
34 #define WORKITEMS_ALLOC_INC 16
35 #define RESPONSES_ALLOC_INC 4
36
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38 * set the maximum to 256kB. If the minimum goes below the
39 * system-defined minimum stack size, we have to adjust accordingly.
40 */
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE (64U * 1024)
43 #endif
44 #ifndef __sun
45 #if defined(PTHREAD_STACK_MIN) && THREAD_MINSTACKSIZE < PTHREAD_STACK_MIN
46 # undef THREAD_MINSTACKSIZE
47 # define THREAD_MINSTACKSIZE PTHREAD_STACK_MIN
48 #endif
49 #endif
50
51 #ifndef THREAD_MAXSTACKSIZE
52 # define THREAD_MAXSTACKSIZE (256U * 1024)
53 #endif
54 #if THREAD_MAXSTACKSIZE < THREAD_MINSTACKSIZE
55 # undef THREAD_MAXSTACKSIZE
56 # define THREAD_MAXSTACKSIZE THREAD_MINSTACKSIZE
57 #endif
58
59
60 #ifdef SYS_WINNT
61
62 # define thread_exit(c) _endthreadex(c)
63 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
64 u_int WINAPI blocking_thread(void *);
65 static BOOL same_os_sema(const sem_ref obj, void * osobj);
66
67 #else
68
69 # define thread_exit(c) pthread_exit((void*)(size_t)(c))
70 # define tickle_sem sem_post
71 void * blocking_thread(void *);
72 static void block_thread_signals(sigset_t *);
73
74 #endif
75
76 #ifdef WORK_PIPE
77 addremove_io_fd_func addremove_io_fd;
78 #else
79 addremove_io_semaphore_func addremove_io_semaphore;
80 #endif
81
82 static void start_blocking_thread(blocking_child *);
83 static void start_blocking_thread_internal(blocking_child *);
84 static void prepare_child_sems(blocking_child *);
85 static int wait_for_sem(sem_ref, struct timespec *);
86 static int ensure_workitems_empty_slot(blocking_child *);
87 static int ensure_workresp_empty_slot(blocking_child *);
88 static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
89 static void cleanup_after_child(blocking_child *);
90
91 static sema_type worker_mmutex;
92 static sem_ref worker_memlock;
93
94 /* --------------------------------------------------------------------
95 * locking the global worker state table (and other global stuff)
96 */
97 void
worker_global_lock(int inOrOut)98 worker_global_lock(
99 int inOrOut)
100 {
101 if (worker_memlock) {
102 if (inOrOut)
103 wait_for_sem(worker_memlock, NULL);
104 else
105 tickle_sem(worker_memlock);
106 }
107 }
108
109 /* --------------------------------------------------------------------
110 * implementation isolation wrapper
111 */
112 void
exit_worker(int exitcode)113 exit_worker(
114 int exitcode
115 )
116 {
117 thread_exit(exitcode); /* see #define thread_exit */
118 }
119
120 /* --------------------------------------------------------------------
121 * sleep for a given time or until the wakup semaphore is tickled.
122 */
123 int
worker_sleep(blocking_child * c,time_t seconds)124 worker_sleep(
125 blocking_child * c,
126 time_t seconds
127 )
128 {
129 struct timespec until;
130 int rc;
131
132 # ifdef HAVE_CLOCK_GETTIME
133 if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
134 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
135 return -1;
136 }
137 # else
138 if (0 != getclock(TIMEOFDAY, &until)) {
139 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
140 return -1;
141 }
142 # endif
143 until.tv_sec += seconds;
144 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
145 if (0 == rc)
146 return -1;
147 if (-1 == rc && ETIMEDOUT == errno)
148 return 0;
149 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
150 return -1;
151 }
152
153
154 /* --------------------------------------------------------------------
155 * Wake up a worker that takes a nap.
156 */
157 void
interrupt_worker_sleep(void)158 interrupt_worker_sleep(void)
159 {
160 u_int idx;
161 blocking_child * c;
162
163 for (idx = 0; idx < blocking_children_alloc; idx++) {
164 c = blocking_children[idx];
165 if (NULL == c || NULL == c->wake_scheduled_sleep)
166 continue;
167 tickle_sem(c->wake_scheduled_sleep);
168 }
169 }
170
171 /* --------------------------------------------------------------------
172 * Make sure there is an empty slot at the head of the request
173 * queue. Tell if the queue is currently empty.
174 */
175 static int
ensure_workitems_empty_slot(blocking_child * c)176 ensure_workitems_empty_slot(
177 blocking_child *c
178 )
179 {
180 /*
181 ** !!! PRECONDITION: caller holds access lock!
182 **
183 ** This simply tries to increase the size of the buffer if it
184 ** becomes full. The resize operation does *not* maintain the
185 ** order of requests, but that should be irrelevant since the
186 ** processing is considered asynchronous anyway.
187 **
188 ** Return if the buffer is currently empty.
189 */
190
191 static const size_t each =
192 sizeof(blocking_children[0]->workitems[0]);
193
194 size_t new_alloc;
195 size_t slots_used;
196 size_t sidx;
197
198 slots_used = c->head_workitem - c->tail_workitem;
199 if (slots_used >= c->workitems_alloc) {
200 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
201 c->workitems = erealloc(c->workitems, new_alloc * each);
202 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
203 c->workitems[sidx] = NULL;
204 c->tail_workitem = 0;
205 c->head_workitem = c->workitems_alloc;
206 c->workitems_alloc = new_alloc;
207 }
208 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
209 return (0 == slots_used);
210 }
211
212 /* --------------------------------------------------------------------
213 * Make sure there is an empty slot at the head of the response
214 * queue. Tell if the queue is currently empty.
215 */
216 static int
ensure_workresp_empty_slot(blocking_child * c)217 ensure_workresp_empty_slot(
218 blocking_child *c
219 )
220 {
221 /*
222 ** !!! PRECONDITION: caller holds access lock!
223 **
224 ** Works like the companion function above.
225 */
226
227 static const size_t each =
228 sizeof(blocking_children[0]->responses[0]);
229
230 size_t new_alloc;
231 size_t slots_used;
232 size_t sidx;
233
234 slots_used = c->head_response - c->tail_response;
235 if (slots_used >= c->responses_alloc) {
236 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
237 c->responses = erealloc(c->responses, new_alloc * each);
238 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
239 c->responses[sidx] = NULL;
240 c->tail_response = 0;
241 c->head_response = c->responses_alloc;
242 c->responses_alloc = new_alloc;
243 }
244 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
245 return (0 == slots_used);
246 }
247
248
249 /* --------------------------------------------------------------------
250 * queue_req_pointer() - append a work item or idle exit request to
251 * blocking_workitems[]. Employ proper locking.
252 */
253 static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)254 queue_req_pointer(
255 blocking_child * c,
256 blocking_pipe_header * hdr
257 )
258 {
259 size_t qhead;
260
261 /* >>>> ACCESS LOCKING STARTS >>>> */
262 wait_for_sem(c->accesslock, NULL);
263 ensure_workitems_empty_slot(c);
264 qhead = c->head_workitem;
265 c->workitems[qhead % c->workitems_alloc] = hdr;
266 c->head_workitem = 1 + qhead;
267 tickle_sem(c->accesslock);
268 /* <<<< ACCESS LOCKING ENDS <<<< */
269
270 /* queue consumer wake-up notification */
271 tickle_sem(c->workitems_pending);
272
273 return 0;
274 }
275
276 /* --------------------------------------------------------------------
277 * API function to make sure a worker is running, a proper private copy
278 * of the data is made, the data eneterd into the queue and the worker
279 * is signalled.
280 */
281 int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)282 send_blocking_req_internal(
283 blocking_child * c,
284 blocking_pipe_header * hdr,
285 void * data
286 )
287 {
288 blocking_pipe_header * threadcopy;
289 size_t payload_octets;
290
291 REQUIRE(hdr != NULL);
292 REQUIRE(data != NULL);
293 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
294
295 if (hdr->octets <= sizeof(*hdr))
296 return 1; /* failure */
297 payload_octets = hdr->octets - sizeof(*hdr);
298
299 if (NULL == c->thread_ref)
300 start_blocking_thread(c);
301 threadcopy = emalloc(hdr->octets);
302 memcpy(threadcopy, hdr, sizeof(*hdr));
303 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
304
305 return queue_req_pointer(c, threadcopy);
306 }
307
308 /* --------------------------------------------------------------------
309 * Wait for the 'incoming queue no longer empty' signal, lock the shared
310 * structure and dequeue an item.
311 */
312 blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)313 receive_blocking_req_internal(
314 blocking_child * c
315 )
316 {
317 blocking_pipe_header * req;
318 size_t qhead, qtail;
319
320 req = NULL;
321 do {
322 /* wait for tickle from the producer side */
323 wait_for_sem(c->workitems_pending, NULL);
324
325 /* >>>> ACCESS LOCKING STARTS >>>> */
326 wait_for_sem(c->accesslock, NULL);
327 qhead = c->head_workitem;
328 do {
329 qtail = c->tail_workitem;
330 if (qhead == qtail)
331 break;
332 c->tail_workitem = qtail + 1;
333 qtail %= c->workitems_alloc;
334 req = c->workitems[qtail];
335 c->workitems[qtail] = NULL;
336 } while (NULL == req);
337 tickle_sem(c->accesslock);
338 /* <<<< ACCESS LOCKING ENDS <<<< */
339
340 } while (NULL == req);
341
342 INSIST(NULL != req);
343 if (CHILD_EXIT_REQ == req) { /* idled out */
344 send_blocking_resp_internal(c, CHILD_GONE_RESP);
345 req = NULL;
346 }
347
348 return req;
349 }
350
351 /* --------------------------------------------------------------------
352 * Push a response into the return queue and eventually tickle the
353 * receiver.
354 */
355 int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)356 send_blocking_resp_internal(
357 blocking_child * c,
358 blocking_pipe_header * resp
359 )
360 {
361 size_t qhead;
362 int empty;
363
364 /* >>>> ACCESS LOCKING STARTS >>>> */
365 wait_for_sem(c->accesslock, NULL);
366 empty = ensure_workresp_empty_slot(c);
367 qhead = c->head_response;
368 c->responses[qhead % c->responses_alloc] = resp;
369 c->head_response = 1 + qhead;
370 tickle_sem(c->accesslock);
371 /* <<<< ACCESS LOCKING ENDS <<<< */
372
373 /* queue consumer wake-up notification */
374 if (empty)
375 {
376 # ifdef WORK_PIPE
377 write(c->resp_write_pipe, "", 1);
378 # else
379 tickle_sem(c->responses_pending);
380 # endif
381 }
382 return 0;
383 }
384
385
386 #ifndef WORK_PIPE
387
388 /* --------------------------------------------------------------------
389 * Check if a (Windows-)hanndle to a semaphore is actually the same we
390 * are using inside the sema wrapper.
391 */
392 static BOOL
same_os_sema(const sem_ref obj,void * osh)393 same_os_sema(
394 const sem_ref obj,
395 void* osh
396 )
397 {
398 return obj && osh && (obj->shnd == (HANDLE)osh);
399 }
400
401 /* --------------------------------------------------------------------
402 * Find the shared context that associates to an OS handle and make sure
403 * the data is dequeued and processed.
404 */
405 void
handle_blocking_resp_sem(void * context)406 handle_blocking_resp_sem(
407 void * context
408 )
409 {
410 blocking_child * c;
411 u_int idx;
412
413 c = NULL;
414 for (idx = 0; idx < blocking_children_alloc; idx++) {
415 c = blocking_children[idx];
416 if (c != NULL &&
417 c->thread_ref != NULL &&
418 same_os_sema(c->responses_pending, context))
419 break;
420 }
421 if (idx < blocking_children_alloc)
422 process_blocking_resp(c);
423 }
424 #endif /* !WORK_PIPE */
425
426 /* --------------------------------------------------------------------
427 * Fetch the next response from the return queue. In case of signalling
428 * via pipe, make sure the pipe is flushed, too.
429 */
430 blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)431 receive_blocking_resp_internal(
432 blocking_child * c
433 )
434 {
435 blocking_pipe_header * removed;
436 size_t qhead, qtail, slot;
437
438 #ifdef WORK_PIPE
439 int rc;
440 char scratch[32];
441
442 do
443 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
444 while (-1 == rc && EINTR == errno);
445 #endif
446
447 /* >>>> ACCESS LOCKING STARTS >>>> */
448 wait_for_sem(c->accesslock, NULL);
449 qhead = c->head_response;
450 qtail = c->tail_response;
451 for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
452 slot = qtail % c->responses_alloc;
453 removed = c->responses[slot];
454 c->responses[slot] = NULL;
455 }
456 c->tail_response = qtail;
457 tickle_sem(c->accesslock);
458 /* <<<< ACCESS LOCKING ENDS <<<< */
459
460 if (NULL != removed) {
461 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
462 BLOCKING_RESP_MAGIC == removed->magic_sig);
463 }
464 if (CHILD_GONE_RESP == removed) {
465 cleanup_after_child(c);
466 removed = NULL;
467 }
468
469 return removed;
470 }
471
472 /* --------------------------------------------------------------------
473 * Light up a new worker.
474 */
475 static void
start_blocking_thread(blocking_child * c)476 start_blocking_thread(
477 blocking_child * c
478 )
479 {
480
481 DEBUG_INSIST(!c->reusable);
482
483 prepare_child_sems(c);
484 start_blocking_thread_internal(c);
485 }
486
487 /* --------------------------------------------------------------------
488 * Create a worker thread. There are several differences between POSIX
489 * and Windows, of course -- most notably the Windows thread is no
490 * detached thread, and we keep the handle around until we want to get
491 * rid of the thread. The notification scheme also differs: Windows
492 * makes use of semaphores in both directions, POSIX uses a pipe for
493 * integration with 'select()' or alike.
494 */
495 static void
start_blocking_thread_internal(blocking_child * c)496 start_blocking_thread_internal(
497 blocking_child * c
498 )
499 #ifdef SYS_WINNT
500 {
501 BOOL resumed;
502
503 c->thread_ref = NULL;
504 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
505 c->thr_table[0].thnd =
506 (HANDLE)_beginthreadex(
507 NULL,
508 0,
509 &blocking_thread,
510 c,
511 CREATE_SUSPENDED,
512 NULL);
513
514 if (NULL == c->thr_table[0].thnd) {
515 msyslog(LOG_ERR, "start blocking thread failed: %m");
516 exit(-1);
517 }
518 /* remember the thread priority is only within the process class */
519 if (!SetThreadPriority(c->thr_table[0].thnd,
520 THREAD_PRIORITY_BELOW_NORMAL))
521 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
522
523 resumed = ResumeThread(c->thr_table[0].thnd);
524 DEBUG_INSIST(resumed);
525 c->thread_ref = &c->thr_table[0];
526 }
527 #else /* pthreads start_blocking_thread_internal() follows */
528 {
529 # ifdef NEED_PTHREAD_INIT
530 static int pthread_init_called;
531 # endif
532 pthread_attr_t thr_attr;
533 int rc;
534 int pipe_ends[2]; /* read then write */
535 int is_pipe;
536 int flags;
537 size_t ostacksize;
538 size_t nstacksize;
539 sigset_t saved_sig_mask;
540
541 c->thread_ref = NULL;
542
543 # ifdef NEED_PTHREAD_INIT
544 /*
545 * from lib/isc/unix/app.c:
546 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
547 */
548 if (!pthread_init_called) {
549 pthread_init();
550 pthread_init_called = TRUE;
551 }
552 # endif
553
554 rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
555 if (0 != rc) {
556 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
557 exit(1);
558 }
559 c->resp_read_pipe = move_fd(pipe_ends[0]);
560 c->resp_write_pipe = move_fd(pipe_ends[1]);
561 c->ispipe = is_pipe;
562 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
563 if (-1 == flags) {
564 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
565 exit(1);
566 }
567 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
568 if (-1 == rc) {
569 msyslog(LOG_ERR,
570 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
571 exit(1);
572 }
573 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
574 pthread_attr_init(&thr_attr);
575 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
576 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
577 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
578 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
579 if (0 != rc) {
580 msyslog(LOG_ERR,
581 "start_blocking_thread: pthread_attr_getstacksize() -> %s",
582 strerror(rc));
583 } else {
584 if (ostacksize < THREAD_MINSTACKSIZE)
585 nstacksize = THREAD_MINSTACKSIZE;
586 else if (ostacksize > THREAD_MAXSTACKSIZE)
587 nstacksize = THREAD_MAXSTACKSIZE;
588 else
589 nstacksize = ostacksize;
590 if (nstacksize != ostacksize)
591 rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
592 if (0 != rc)
593 msyslog(LOG_ERR,
594 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
595 (u_long)ostacksize, (u_long)nstacksize,
596 strerror(rc));
597 }
598 #else
599 UNUSED_ARG(nstacksize);
600 UNUSED_ARG(ostacksize);
601 #endif
602 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
603 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
604 #endif
605 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
606 block_thread_signals(&saved_sig_mask);
607 rc = pthread_create(&c->thr_table[0], &thr_attr,
608 &blocking_thread, c);
609 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
610 pthread_attr_destroy(&thr_attr);
611 if (0 != rc) {
612 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
613 strerror(rc));
614 exit(1);
615 }
616 c->thread_ref = &c->thr_table[0];
617 }
618 #endif
619
620 /* --------------------------------------------------------------------
621 * block_thread_signals()
622 *
623 * Temporarily block signals used by ntpd main thread, so that signal
624 * mask inherited by child threads leaves them blocked. Returns prior
625 * active signal mask via pmask, to be restored by the main thread
626 * after pthread_create().
627 */
628 #ifndef SYS_WINNT
629 void
block_thread_signals(sigset_t * pmask)630 block_thread_signals(
631 sigset_t * pmask
632 )
633 {
634 sigset_t block;
635
636 sigemptyset(&block);
637 # ifdef HAVE_SIGNALED_IO
638 # ifdef SIGIO
639 sigaddset(&block, SIGIO);
640 # endif
641 # ifdef SIGPOLL
642 sigaddset(&block, SIGPOLL);
643 # endif
644 # endif /* HAVE_SIGNALED_IO */
645 sigaddset(&block, SIGALRM);
646 sigaddset(&block, MOREDEBUGSIG);
647 sigaddset(&block, LESSDEBUGSIG);
648 # ifdef SIGDIE1
649 sigaddset(&block, SIGDIE1);
650 # endif
651 # ifdef SIGDIE2
652 sigaddset(&block, SIGDIE2);
653 # endif
654 # ifdef SIGDIE3
655 sigaddset(&block, SIGDIE3);
656 # endif
657 # ifdef SIGDIE4
658 sigaddset(&block, SIGDIE4);
659 # endif
660 # ifdef SIGBUS
661 sigaddset(&block, SIGBUS);
662 # endif
663 sigemptyset(pmask);
664 pthread_sigmask(SIG_BLOCK, &block, pmask);
665 }
666 #endif /* !SYS_WINNT */
667
668
669 /* --------------------------------------------------------------------
670 * Create & destroy semaphores. This is sufficiently different between
671 * POSIX and Windows to warrant wrapper functions and close enough to
672 * use the concept of synchronization via semaphore for all platforms.
673 */
674 static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)675 create_sema(
676 sema_type* semptr,
677 u_int inival,
678 u_int maxval)
679 {
680 #ifdef SYS_WINNT
681
682 long svini, svmax;
683 if (NULL != semptr) {
684 svini = (inival < LONG_MAX)
685 ? (long)inival : LONG_MAX;
686 svmax = (maxval < LONG_MAX && maxval > 0)
687 ? (long)maxval : LONG_MAX;
688 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
689 if (NULL == semptr->shnd)
690 semptr = NULL;
691 }
692
693 #else
694
695 (void)maxval;
696 if (semptr && sem_init(semptr, FALSE, inival))
697 semptr = NULL;
698
699 #endif
700
701 return semptr;
702 }
703
704 /* ------------------------------------------------------------------ */
705 static sem_ref
delete_sema(sem_ref obj)706 delete_sema(
707 sem_ref obj)
708 {
709
710 # ifdef SYS_WINNT
711
712 if (obj) {
713 if (obj->shnd)
714 CloseHandle(obj->shnd);
715 obj->shnd = NULL;
716 }
717
718 # else
719
720 if (obj)
721 sem_destroy(obj);
722
723 # endif
724
725 return NULL;
726 }
727
728 /* --------------------------------------------------------------------
729 * prepare_child_sems()
730 *
731 * create sync & access semaphores
732 *
733 * All semaphores are cleared, only the access semaphore has 1 unit.
734 * Childs wait on 'workitems_pending', then grabs 'sema_access'
735 * and dequeues jobs. When done, 'sema_access' is given one unit back.
736 *
737 * The producer grabs 'sema_access', manages the queue, restores
738 * 'sema_access' and puts one unit into 'workitems_pending'.
739 *
740 * The story goes the same for the response queue.
741 */
742 static void
prepare_child_sems(blocking_child * c)743 prepare_child_sems(
744 blocking_child *c
745 )
746 {
747 if (NULL == worker_memlock)
748 worker_memlock = create_sema(&worker_mmutex, 1, 1);
749
750 c->accesslock = create_sema(&c->sem_table[0], 1, 1);
751 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
752 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
753 # ifndef WORK_PIPE
754 c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
755 # endif
756 }
757
758 /* --------------------------------------------------------------------
759 * wait for semaphore. Where the wait can be interrupted, it will
760 * internally resume -- When this function returns, there is either no
761 * semaphore at all, a timeout occurred, or the caller could
762 * successfully take a token from the semaphore.
763 *
764 * For untimed wait, not checking the result of this function at all is
765 * definitely an option.
766 */
767 static int
wait_for_sem(sem_ref sem,struct timespec * timeout)768 wait_for_sem(
769 sem_ref sem,
770 struct timespec * timeout /* wall-clock */
771 )
772 #ifdef SYS_WINNT
773 {
774 struct timespec now;
775 struct timespec delta;
776 DWORD msec;
777 DWORD rc;
778
779 if (!(sem && sem->shnd)) {
780 errno = EINVAL;
781 return -1;
782 }
783
784 if (NULL == timeout) {
785 msec = INFINITE;
786 } else {
787 getclock(TIMEOFDAY, &now);
788 delta = sub_tspec(*timeout, now);
789 if (delta.tv_sec < 0) {
790 msec = 0;
791 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
792 msec = INFINITE;
793 } else {
794 msec = 1000 * (DWORD)delta.tv_sec;
795 msec += delta.tv_nsec / (1000 * 1000);
796 }
797 }
798 rc = WaitForSingleObject(sem->shnd, msec);
799 if (WAIT_OBJECT_0 == rc)
800 return 0;
801 if (WAIT_TIMEOUT == rc) {
802 errno = ETIMEDOUT;
803 return -1;
804 }
805 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
806 errno = EFAULT;
807 return -1;
808 }
809 #else /* pthreads wait_for_sem() follows */
810 {
811 int rc = -1;
812
813 if (sem) do {
814 if (NULL == timeout)
815 rc = sem_wait(sem);
816 else
817 rc = sem_timedwait(sem, timeout);
818 } while (rc == -1 && errno == EINTR);
819 else
820 errno = EINVAL;
821
822 return rc;
823 }
824 #endif
825
826 /* --------------------------------------------------------------------
827 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
828 * calling conventions under Windows and POSIX-defined signature
829 * otherwise.
830 */
831 #ifdef SYS_WINNT
832 u_int WINAPI
833 #else
834 void *
835 #endif
blocking_thread(void * ThreadArg)836 blocking_thread(
837 void * ThreadArg
838 )
839 {
840 blocking_child *c;
841
842 c = ThreadArg;
843 exit_worker(blocking_child_common(c));
844
845 /* NOTREACHED */
846 return 0;
847 }
848
849 /* --------------------------------------------------------------------
850 * req_child_exit() runs in the parent.
851 *
852 * This function is called from from the idle timer, too, and possibly
853 * without a thread being there any longer. Since we have folded up our
854 * tent in that case and all the semaphores are already gone, we simply
855 * ignore this request in this case.
856 *
857 * Since the existence of the semaphores is controlled exclusively by
858 * the parent, there's no risk of data race here.
859 */
860 int
req_child_exit(blocking_child * c)861 req_child_exit(
862 blocking_child *c
863 )
864 {
865 return (c->accesslock)
866 ? queue_req_pointer(c, CHILD_EXIT_REQ)
867 : 0;
868 }
869
870 /* --------------------------------------------------------------------
871 * cleanup_after_child() runs in parent.
872 */
873 static void
cleanup_after_child(blocking_child * c)874 cleanup_after_child(
875 blocking_child * c
876 )
877 {
878 DEBUG_INSIST(!c->reusable);
879
880 # ifdef SYS_WINNT
881 /* The thread was not created in detached state, so we better
882 * clean up.
883 */
884 if (c->thread_ref && c->thread_ref->thnd) {
885 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
886 INSIST(CloseHandle(c->thread_ref->thnd));
887 c->thread_ref->thnd = NULL;
888 }
889 # endif
890 c->thread_ref = NULL;
891
892 /* remove semaphores and (if signalling vi IO) pipes */
893
894 c->accesslock = delete_sema(c->accesslock);
895 c->workitems_pending = delete_sema(c->workitems_pending);
896 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
897
898 # ifdef WORK_PIPE
899 DEBUG_INSIST(-1 != c->resp_read_pipe);
900 DEBUG_INSIST(-1 != c->resp_write_pipe);
901 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
902 close(c->resp_write_pipe);
903 close(c->resp_read_pipe);
904 c->resp_write_pipe = -1;
905 c->resp_read_pipe = -1;
906 # else
907 DEBUG_INSIST(NULL != c->responses_pending);
908 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
909 c->responses_pending = delete_sema(c->responses_pending);
910 # endif
911
912 /* Is it necessary to check if there are pending requests and
913 * responses? If so, and if there are, what to do with them?
914 */
915
916 /* re-init buffer index sequencers */
917 c->head_workitem = 0;
918 c->tail_workitem = 0;
919 c->head_response = 0;
920 c->tail_response = 0;
921
922 c->reusable = TRUE;
923 }
924
925
926 #else /* !WORK_THREAD follows */
927 char work_thread_nonempty_compilation_unit;
928 #endif
929