1 /*
2 * work_thread.c - threads implementation for blocking worker child.
3 */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6
7 #ifdef WORK_THREAD
8
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12 #ifndef SYS_WINNT
13 #include <pthread.h>
14 #endif
15
16 #include "ntp_stdlib.h"
17 #include "ntp_malloc.h"
18 #include "ntp_syslog.h"
19 #include "ntpd.h"
20 #include "ntp_io.h"
21 #include "ntp_assert.h"
22 #include "ntp_unixtime.h"
23 #include "timespecops.h"
24 #include "ntp_worker.h"
25
26 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
27 #define CHILD_GONE_RESP CHILD_EXIT_REQ
28 /* Queue size increments:
29 * The request queue grows a bit faster than the response queue -- the
30 * daemon can push requests and pull results faster on avarage than the
31 * worker can process requests and push results... If this really pays
32 * off is debatable.
33 */
34 #define WORKITEMS_ALLOC_INC 16
35 #define RESPONSES_ALLOC_INC 4
36
37 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
38 * set the maximum to 256kB. If the minimum goes below the
39 * system-defined minimum stack size, we have to adjust accordingly.
40 */
41 #ifndef THREAD_MINSTACKSIZE
42 # define THREAD_MINSTACKSIZE (64U * 1024)
43 #endif
44
45 #ifndef THREAD_MAXSTACKSIZE
46 # define THREAD_MAXSTACKSIZE (256U * 1024)
47 #endif
48
49 /* need a good integer to store a pointer... */
50 #ifndef UINTPTR_T
51 # if defined(UINTPTR_MAX)
52 # define UINTPTR_T uintptr_t
53 # elif defined(UINT_PTR)
54 # define UINTPTR_T UINT_PTR
55 # else
56 # define UINTPTR_T size_t
57 # endif
58 #endif
59
60
61 #ifdef SYS_WINNT
62
63 # define thread_exit(c) _endthreadex(c)
64 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
65 u_int WINAPI blocking_thread(void *);
66 static BOOL same_os_sema(const sem_ref obj, void * osobj);
67
68 #else
69
70 # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c))
71 # define tickle_sem sem_post
72 void * blocking_thread(void *);
73 static void block_thread_signals(sigset_t *);
74
75 #endif
76
77 #ifdef WORK_PIPE
78 addremove_io_fd_func addremove_io_fd;
79 #else
80 addremove_io_semaphore_func addremove_io_semaphore;
81 #endif
82
83 static void start_blocking_thread(blocking_child *);
84 static void start_blocking_thread_internal(blocking_child *);
85 static void prepare_child_sems(blocking_child *);
86 static int wait_for_sem(sem_ref, struct timespec *);
87 static int ensure_workitems_empty_slot(blocking_child *);
88 static int ensure_workresp_empty_slot(blocking_child *);
89 static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
90 static void cleanup_after_child(blocking_child *);
91
92 static sema_type worker_mmutex;
93 static sem_ref worker_memlock;
94
95 /* --------------------------------------------------------------------
96 * locking the global worker state table (and other global stuff)
97 */
98 void
worker_global_lock(int inOrOut)99 worker_global_lock(
100 int inOrOut)
101 {
102 if (worker_memlock) {
103 if (inOrOut)
104 wait_for_sem(worker_memlock, NULL);
105 else
106 tickle_sem(worker_memlock);
107 }
108 }
109
110 /* --------------------------------------------------------------------
111 * implementation isolation wrapper
112 */
113 void
exit_worker(int exitcode)114 exit_worker(
115 int exitcode
116 )
117 {
118 thread_exit(exitcode); /* see #define thread_exit */
119 }
120
121 /* --------------------------------------------------------------------
122 * sleep for a given time or until the wakup semaphore is tickled.
123 */
124 int
worker_sleep(blocking_child * c,time_t seconds)125 worker_sleep(
126 blocking_child * c,
127 time_t seconds
128 )
129 {
130 struct timespec until;
131 int rc;
132
133 # ifdef HAVE_CLOCK_GETTIME
134 if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
135 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
136 return -1;
137 }
138 # else
139 if (0 != getclock(TIMEOFDAY, &until)) {
140 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
141 return -1;
142 }
143 # endif
144 until.tv_sec += seconds;
145 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
146 if (0 == rc)
147 return -1;
148 if (-1 == rc && ETIMEDOUT == errno)
149 return 0;
150 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
151 return -1;
152 }
153
154
155 /* --------------------------------------------------------------------
156 * Wake up a worker that takes a nap.
157 */
158 void
interrupt_worker_sleep(void)159 interrupt_worker_sleep(void)
160 {
161 u_int idx;
162 blocking_child * c;
163
164 for (idx = 0; idx < blocking_children_alloc; idx++) {
165 c = blocking_children[idx];
166 if (NULL == c || NULL == c->wake_scheduled_sleep)
167 continue;
168 tickle_sem(c->wake_scheduled_sleep);
169 }
170 }
171
172 /* --------------------------------------------------------------------
173 * Make sure there is an empty slot at the head of the request
174 * queue. Tell if the queue is currently empty.
175 */
176 static int
ensure_workitems_empty_slot(blocking_child * c)177 ensure_workitems_empty_slot(
178 blocking_child *c
179 )
180 {
181 /*
182 ** !!! PRECONDITION: caller holds access lock!
183 **
184 ** This simply tries to increase the size of the buffer if it
185 ** becomes full. The resize operation does *not* maintain the
186 ** order of requests, but that should be irrelevant since the
187 ** processing is considered asynchronous anyway.
188 **
189 ** Return if the buffer is currently empty.
190 */
191
192 static const size_t each =
193 sizeof(blocking_children[0]->workitems[0]);
194
195 size_t new_alloc;
196 size_t slots_used;
197 size_t sidx;
198
199 slots_used = c->head_workitem - c->tail_workitem;
200 if (slots_used >= c->workitems_alloc) {
201 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
202 c->workitems = erealloc(c->workitems, new_alloc * each);
203 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
204 c->workitems[sidx] = NULL;
205 c->tail_workitem = 0;
206 c->head_workitem = c->workitems_alloc;
207 c->workitems_alloc = new_alloc;
208 }
209 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
210 return (0 == slots_used);
211 }
212
213 /* --------------------------------------------------------------------
214 * Make sure there is an empty slot at the head of the response
215 * queue. Tell if the queue is currently empty.
216 */
217 static int
ensure_workresp_empty_slot(blocking_child * c)218 ensure_workresp_empty_slot(
219 blocking_child *c
220 )
221 {
222 /*
223 ** !!! PRECONDITION: caller holds access lock!
224 **
225 ** Works like the companion function above.
226 */
227
228 static const size_t each =
229 sizeof(blocking_children[0]->responses[0]);
230
231 size_t new_alloc;
232 size_t slots_used;
233 size_t sidx;
234
235 slots_used = c->head_response - c->tail_response;
236 if (slots_used >= c->responses_alloc) {
237 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
238 c->responses = erealloc(c->responses, new_alloc * each);
239 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
240 c->responses[sidx] = NULL;
241 c->tail_response = 0;
242 c->head_response = c->responses_alloc;
243 c->responses_alloc = new_alloc;
244 }
245 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
246 return (0 == slots_used);
247 }
248
249
250 /* --------------------------------------------------------------------
251 * queue_req_pointer() - append a work item or idle exit request to
252 * blocking_workitems[]. Employ proper locking.
253 */
254 static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)255 queue_req_pointer(
256 blocking_child * c,
257 blocking_pipe_header * hdr
258 )
259 {
260 size_t qhead;
261
262 /* >>>> ACCESS LOCKING STARTS >>>> */
263 wait_for_sem(c->accesslock, NULL);
264 ensure_workitems_empty_slot(c);
265 qhead = c->head_workitem;
266 c->workitems[qhead % c->workitems_alloc] = hdr;
267 c->head_workitem = 1 + qhead;
268 tickle_sem(c->accesslock);
269 /* <<<< ACCESS LOCKING ENDS <<<< */
270
271 /* queue consumer wake-up notification */
272 tickle_sem(c->workitems_pending);
273
274 return 0;
275 }
276
277 /* --------------------------------------------------------------------
278 * API function to make sure a worker is running, a proper private copy
279 * of the data is made, the data eneterd into the queue and the worker
280 * is signalled.
281 */
282 int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)283 send_blocking_req_internal(
284 blocking_child * c,
285 blocking_pipe_header * hdr,
286 void * data
287 )
288 {
289 blocking_pipe_header * threadcopy;
290 size_t payload_octets;
291
292 REQUIRE(hdr != NULL);
293 REQUIRE(data != NULL);
294 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
295
296 if (hdr->octets <= sizeof(*hdr))
297 return 1; /* failure */
298 payload_octets = hdr->octets - sizeof(*hdr);
299
300 if (NULL == c->thread_ref)
301 start_blocking_thread(c);
302 threadcopy = emalloc(hdr->octets);
303 memcpy(threadcopy, hdr, sizeof(*hdr));
304 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
305
306 return queue_req_pointer(c, threadcopy);
307 }
308
309 /* --------------------------------------------------------------------
310 * Wait for the 'incoming queue no longer empty' signal, lock the shared
311 * structure and dequeue an item.
312 */
313 blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)314 receive_blocking_req_internal(
315 blocking_child * c
316 )
317 {
318 blocking_pipe_header * req;
319 size_t qhead, qtail;
320
321 req = NULL;
322 do {
323 /* wait for tickle from the producer side */
324 wait_for_sem(c->workitems_pending, NULL);
325
326 /* >>>> ACCESS LOCKING STARTS >>>> */
327 wait_for_sem(c->accesslock, NULL);
328 qhead = c->head_workitem;
329 do {
330 qtail = c->tail_workitem;
331 if (qhead == qtail)
332 break;
333 c->tail_workitem = qtail + 1;
334 qtail %= c->workitems_alloc;
335 req = c->workitems[qtail];
336 c->workitems[qtail] = NULL;
337 } while (NULL == req);
338 tickle_sem(c->accesslock);
339 /* <<<< ACCESS LOCKING ENDS <<<< */
340
341 } while (NULL == req);
342
343 INSIST(NULL != req);
344 if (CHILD_EXIT_REQ == req) { /* idled out */
345 send_blocking_resp_internal(c, CHILD_GONE_RESP);
346 req = NULL;
347 }
348
349 return req;
350 }
351
352 /* --------------------------------------------------------------------
353 * Push a response into the return queue and eventually tickle the
354 * receiver.
355 */
356 int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)357 send_blocking_resp_internal(
358 blocking_child * c,
359 blocking_pipe_header * resp
360 )
361 {
362 size_t qhead;
363 int empty;
364
365 /* >>>> ACCESS LOCKING STARTS >>>> */
366 wait_for_sem(c->accesslock, NULL);
367 empty = ensure_workresp_empty_slot(c);
368 qhead = c->head_response;
369 c->responses[qhead % c->responses_alloc] = resp;
370 c->head_response = 1 + qhead;
371 tickle_sem(c->accesslock);
372 /* <<<< ACCESS LOCKING ENDS <<<< */
373
374 /* queue consumer wake-up notification */
375 if (empty)
376 {
377 # ifdef WORK_PIPE
378 if (1 != write(c->resp_write_pipe, "", 1))
379 msyslog(LOG_WARNING, "async resolver: %s",
380 "failed to notify main thread!");
381 # else
382 tickle_sem(c->responses_pending);
383 # endif
384 }
385 return 0;
386 }
387
388
389 #ifndef WORK_PIPE
390
391 /* --------------------------------------------------------------------
392 * Check if a (Windows-)handle to a semaphore is actually the same we
393 * are using inside the sema wrapper.
394 */
395 static BOOL
same_os_sema(const sem_ref obj,void * osh)396 same_os_sema(
397 const sem_ref obj,
398 void* osh
399 )
400 {
401 return obj && osh && (obj->shnd == (HANDLE)osh);
402 }
403
404 /* --------------------------------------------------------------------
405 * Find the shared context that associates to an OS handle and make sure
406 * the data is dequeued and processed.
407 */
408 void
handle_blocking_resp_sem(void * context)409 handle_blocking_resp_sem(
410 void * context
411 )
412 {
413 blocking_child * c;
414 u_int idx;
415
416 c = NULL;
417 for (idx = 0; idx < blocking_children_alloc; idx++) {
418 c = blocking_children[idx];
419 if (c != NULL &&
420 c->thread_ref != NULL &&
421 same_os_sema(c->responses_pending, context))
422 break;
423 }
424 if (idx < blocking_children_alloc)
425 process_blocking_resp(c);
426 }
427 #endif /* !WORK_PIPE */
428
429 /* --------------------------------------------------------------------
430 * Fetch the next response from the return queue. In case of signalling
431 * via pipe, make sure the pipe is flushed, too.
432 */
433 blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)434 receive_blocking_resp_internal(
435 blocking_child * c
436 )
437 {
438 blocking_pipe_header * removed;
439 size_t qhead, qtail, slot;
440
441 #ifdef WORK_PIPE
442 int rc;
443 char scratch[32];
444
445 do
446 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
447 while (-1 == rc && EINTR == errno);
448 #endif
449
450 /* >>>> ACCESS LOCKING STARTS >>>> */
451 wait_for_sem(c->accesslock, NULL);
452 qhead = c->head_response;
453 qtail = c->tail_response;
454 for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
455 slot = qtail % c->responses_alloc;
456 removed = c->responses[slot];
457 c->responses[slot] = NULL;
458 }
459 c->tail_response = qtail;
460 tickle_sem(c->accesslock);
461 /* <<<< ACCESS LOCKING ENDS <<<< */
462
463 if (NULL != removed) {
464 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
465 BLOCKING_RESP_MAGIC == removed->magic_sig);
466 }
467 if (CHILD_GONE_RESP == removed) {
468 cleanup_after_child(c);
469 removed = NULL;
470 }
471
472 return removed;
473 }
474
475 /* --------------------------------------------------------------------
476 * Light up a new worker.
477 */
478 static void
start_blocking_thread(blocking_child * c)479 start_blocking_thread(
480 blocking_child * c
481 )
482 {
483
484 DEBUG_INSIST(!c->reusable);
485
486 prepare_child_sems(c);
487 start_blocking_thread_internal(c);
488 }
489
490 /* --------------------------------------------------------------------
491 * Create a worker thread. There are several differences between POSIX
492 * and Windows, of course -- most notably the Windows thread is no
493 * detached thread, and we keep the handle around until we want to get
494 * rid of the thread. The notification scheme also differs: Windows
495 * makes use of semaphores in both directions, POSIX uses a pipe for
496 * integration with 'select()' or alike.
497 */
498 static void
start_blocking_thread_internal(blocking_child * c)499 start_blocking_thread_internal(
500 blocking_child * c
501 )
502 #ifdef SYS_WINNT
503 {
504 BOOL resumed;
505
506 c->thread_ref = NULL;
507 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
508 c->thr_table[0].thnd =
509 (HANDLE)_beginthreadex(
510 NULL,
511 0,
512 &blocking_thread,
513 c,
514 CREATE_SUSPENDED,
515 NULL);
516
517 if (NULL == c->thr_table[0].thnd) {
518 msyslog(LOG_ERR, "start blocking thread failed: %m");
519 exit(-1);
520 }
521 /* remember the thread priority is only within the process class */
522 if (!SetThreadPriority(c->thr_table[0].thnd,
523 THREAD_PRIORITY_BELOW_NORMAL))
524 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
525
526 resumed = ResumeThread(c->thr_table[0].thnd);
527 DEBUG_INSIST(resumed);
528 c->thread_ref = &c->thr_table[0];
529 }
530 #else /* pthreads start_blocking_thread_internal() follows */
531 {
532 # ifdef NEED_PTHREAD_INIT
533 static int pthread_init_called;
534 # endif
535 pthread_attr_t thr_attr;
536 int rc;
537 int pipe_ends[2]; /* read then write */
538 int is_pipe;
539 int flags;
540 size_t ostacksize;
541 size_t nstacksize;
542 sigset_t saved_sig_mask;
543
544 c->thread_ref = NULL;
545
546 # ifdef NEED_PTHREAD_INIT
547 /*
548 * from lib/isc/unix/app.c:
549 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
550 */
551 if (!pthread_init_called) {
552 pthread_init();
553 pthread_init_called = TRUE;
554 }
555 # endif
556
557 rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
558 if (0 != rc) {
559 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
560 exit(1);
561 }
562 c->resp_read_pipe = move_fd(pipe_ends[0]);
563 c->resp_write_pipe = move_fd(pipe_ends[1]);
564 c->ispipe = is_pipe;
565 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
566 if (-1 == flags) {
567 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
568 exit(1);
569 }
570 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
571 if (-1 == rc) {
572 msyslog(LOG_ERR,
573 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
574 exit(1);
575 }
576 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
577 pthread_attr_init(&thr_attr);
578 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
579 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
580 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
581 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
582 if (0 != rc) {
583 msyslog(LOG_ERR,
584 "start_blocking_thread: pthread_attr_getstacksize() -> %s",
585 strerror(rc));
586 } else {
587 nstacksize = ostacksize;
588 /* order is important here: first clamp on upper limit,
589 * and the PTHREAD min stack size is ultimate override!
590 */
591 if (nstacksize > THREAD_MAXSTACKSIZE)
592 nstacksize = THREAD_MAXSTACKSIZE;
593 # ifdef PTHREAD_STACK_MAX
594 if (nstacksize > PTHREAD_STACK_MAX)
595 nstacksize = PTHREAD_STACK_MAX;
596 # endif
597
598 /* now clamp on lower stack limit. */
599 if (nstacksize < THREAD_MINSTACKSIZE)
600 nstacksize = THREAD_MINSTACKSIZE;
601 # ifdef PTHREAD_STACK_MIN
602 if (nstacksize < PTHREAD_STACK_MIN)
603 nstacksize = PTHREAD_STACK_MIN;
604 # endif
605
606 if (nstacksize != ostacksize)
607 rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
608 if (0 != rc)
609 msyslog(LOG_ERR,
610 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
611 (u_long)ostacksize, (u_long)nstacksize,
612 strerror(rc));
613 }
614 #else
615 UNUSED_ARG(nstacksize);
616 UNUSED_ARG(ostacksize);
617 #endif
618 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
619 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
620 #endif
621 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
622 block_thread_signals(&saved_sig_mask);
623 rc = pthread_create(&c->thr_table[0], &thr_attr,
624 &blocking_thread, c);
625 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
626 pthread_attr_destroy(&thr_attr);
627 if (0 != rc) {
628 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
629 strerror(rc));
630 exit(1);
631 }
632 c->thread_ref = &c->thr_table[0];
633 }
634 #endif
635
636 /* --------------------------------------------------------------------
637 * block_thread_signals()
638 *
639 * Temporarily block signals used by ntpd main thread, so that signal
640 * mask inherited by child threads leaves them blocked. Returns prior
641 * active signal mask via pmask, to be restored by the main thread
642 * after pthread_create().
643 */
644 #ifndef SYS_WINNT
645 void
block_thread_signals(sigset_t * pmask)646 block_thread_signals(
647 sigset_t * pmask
648 )
649 {
650 sigset_t block;
651
652 sigemptyset(&block);
653 # ifdef HAVE_SIGNALED_IO
654 # ifdef SIGIO
655 sigaddset(&block, SIGIO);
656 # endif
657 # ifdef SIGPOLL
658 sigaddset(&block, SIGPOLL);
659 # endif
660 # endif /* HAVE_SIGNALED_IO */
661 sigaddset(&block, SIGALRM);
662 sigaddset(&block, MOREDEBUGSIG);
663 sigaddset(&block, LESSDEBUGSIG);
664 # ifdef SIGDIE1
665 sigaddset(&block, SIGDIE1);
666 # endif
667 # ifdef SIGDIE2
668 sigaddset(&block, SIGDIE2);
669 # endif
670 # ifdef SIGDIE3
671 sigaddset(&block, SIGDIE3);
672 # endif
673 # ifdef SIGDIE4
674 sigaddset(&block, SIGDIE4);
675 # endif
676 # ifdef SIGBUS
677 sigaddset(&block, SIGBUS);
678 # endif
679 sigemptyset(pmask);
680 pthread_sigmask(SIG_BLOCK, &block, pmask);
681 }
682 #endif /* !SYS_WINNT */
683
684
685 /* --------------------------------------------------------------------
686 * Create & destroy semaphores. This is sufficiently different between
687 * POSIX and Windows to warrant wrapper functions and close enough to
688 * use the concept of synchronization via semaphore for all platforms.
689 */
690 static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)691 create_sema(
692 sema_type* semptr,
693 u_int inival,
694 u_int maxval)
695 {
696 #ifdef SYS_WINNT
697
698 long svini, svmax;
699 if (NULL != semptr) {
700 svini = (inival < LONG_MAX)
701 ? (long)inival : LONG_MAX;
702 svmax = (maxval < LONG_MAX && maxval > 0)
703 ? (long)maxval : LONG_MAX;
704 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
705 if (NULL == semptr->shnd)
706 semptr = NULL;
707 }
708
709 #else
710
711 (void)maxval;
712 if (semptr && sem_init(semptr, FALSE, inival))
713 semptr = NULL;
714
715 #endif
716
717 return semptr;
718 }
719
720 /* ------------------------------------------------------------------ */
721 static sem_ref
delete_sema(sem_ref obj)722 delete_sema(
723 sem_ref obj)
724 {
725
726 # ifdef SYS_WINNT
727
728 if (obj) {
729 if (obj->shnd)
730 CloseHandle(obj->shnd);
731 obj->shnd = NULL;
732 }
733
734 # else
735
736 if (obj)
737 sem_destroy(obj);
738
739 # endif
740
741 return NULL;
742 }
743
744 /* --------------------------------------------------------------------
745 * prepare_child_sems()
746 *
747 * create sync & access semaphores
748 *
749 * All semaphores are cleared, only the access semaphore has 1 unit.
750 * Childs wait on 'workitems_pending', then grabs 'sema_access'
751 * and dequeues jobs. When done, 'sema_access' is given one unit back.
752 *
753 * The producer grabs 'sema_access', manages the queue, restores
754 * 'sema_access' and puts one unit into 'workitems_pending'.
755 *
756 * The story goes the same for the response queue.
757 */
758 static void
prepare_child_sems(blocking_child * c)759 prepare_child_sems(
760 blocking_child *c
761 )
762 {
763 if (NULL == worker_memlock)
764 worker_memlock = create_sema(&worker_mmutex, 1, 1);
765
766 c->accesslock = create_sema(&c->sem_table[0], 1, 1);
767 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
768 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
769 # ifndef WORK_PIPE
770 c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
771 # endif
772 }
773
774 /* --------------------------------------------------------------------
775 * wait for semaphore. Where the wait can be interrupted, it will
776 * internally resume -- When this function returns, there is either no
777 * semaphore at all, a timeout occurred, or the caller could
778 * successfully take a token from the semaphore.
779 *
780 * For untimed wait, not checking the result of this function at all is
781 * definitely an option.
782 */
783 static int
wait_for_sem(sem_ref sem,struct timespec * timeout)784 wait_for_sem(
785 sem_ref sem,
786 struct timespec * timeout /* wall-clock */
787 )
788 #ifdef SYS_WINNT
789 {
790 struct timespec now;
791 struct timespec delta;
792 DWORD msec;
793 DWORD rc;
794
795 if (!(sem && sem->shnd)) {
796 errno = EINVAL;
797 return -1;
798 }
799
800 if (NULL == timeout) {
801 msec = INFINITE;
802 } else {
803 getclock(TIMEOFDAY, &now);
804 delta = sub_tspec(*timeout, now);
805 if (delta.tv_sec < 0) {
806 msec = 0;
807 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
808 msec = INFINITE;
809 } else {
810 msec = 1000 * (DWORD)delta.tv_sec;
811 msec += delta.tv_nsec / (1000 * 1000);
812 }
813 }
814 rc = WaitForSingleObject(sem->shnd, msec);
815 if (WAIT_OBJECT_0 == rc)
816 return 0;
817 if (WAIT_TIMEOUT == rc) {
818 errno = ETIMEDOUT;
819 return -1;
820 }
821 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
822 errno = EFAULT;
823 return -1;
824 }
825 #else /* pthreads wait_for_sem() follows */
826 {
827 int rc = -1;
828
829 if (sem) do {
830 if (NULL == timeout)
831 rc = sem_wait(sem);
832 else
833 rc = sem_timedwait(sem, timeout);
834 } while (rc == -1 && errno == EINTR);
835 else
836 errno = EINVAL;
837
838 return rc;
839 }
840 #endif
841
842 /* --------------------------------------------------------------------
843 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
844 * calling conventions under Windows and POSIX-defined signature
845 * otherwise.
846 */
847 #ifdef SYS_WINNT
848 u_int WINAPI
849 #else
850 void *
851 #endif
blocking_thread(void * ThreadArg)852 blocking_thread(
853 void * ThreadArg
854 )
855 {
856 blocking_child *c;
857
858 c = ThreadArg;
859 exit_worker(blocking_child_common(c));
860
861 /* NOTREACHED */
862 return 0;
863 }
864
865 /* --------------------------------------------------------------------
866 * req_child_exit() runs in the parent.
867 *
868 * This function is called from from the idle timer, too, and possibly
869 * without a thread being there any longer. Since we have folded up our
870 * tent in that case and all the semaphores are already gone, we simply
871 * ignore this request in this case.
872 *
873 * Since the existence of the semaphores is controlled exclusively by
874 * the parent, there's no risk of data race here.
875 */
876 int
req_child_exit(blocking_child * c)877 req_child_exit(
878 blocking_child *c
879 )
880 {
881 return (c->accesslock)
882 ? queue_req_pointer(c, CHILD_EXIT_REQ)
883 : 0;
884 }
885
886 /* --------------------------------------------------------------------
887 * cleanup_after_child() runs in parent.
888 */
889 static void
cleanup_after_child(blocking_child * c)890 cleanup_after_child(
891 blocking_child * c
892 )
893 {
894 DEBUG_INSIST(!c->reusable);
895
896 # ifdef SYS_WINNT
897 /* The thread was not created in detached state, so we better
898 * clean up.
899 */
900 if (c->thread_ref && c->thread_ref->thnd) {
901 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
902 INSIST(CloseHandle(c->thread_ref->thnd));
903 c->thread_ref->thnd = NULL;
904 }
905 # endif
906 c->thread_ref = NULL;
907
908 /* remove semaphores and (if signalling vi IO) pipes */
909
910 c->accesslock = delete_sema(c->accesslock);
911 c->workitems_pending = delete_sema(c->workitems_pending);
912 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
913
914 # ifdef WORK_PIPE
915 DEBUG_INSIST(-1 != c->resp_read_pipe);
916 DEBUG_INSIST(-1 != c->resp_write_pipe);
917 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
918 close(c->resp_write_pipe);
919 close(c->resp_read_pipe);
920 c->resp_write_pipe = -1;
921 c->resp_read_pipe = -1;
922 # else
923 DEBUG_INSIST(NULL != c->responses_pending);
924 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
925 c->responses_pending = delete_sema(c->responses_pending);
926 # endif
927
928 /* Is it necessary to check if there are pending requests and
929 * responses? If so, and if there are, what to do with them?
930 */
931
932 /* re-init buffer index sequencers */
933 c->head_workitem = 0;
934 c->tail_workitem = 0;
935 c->head_response = 0;
936 c->tail_response = 0;
937
938 c->reusable = TRUE;
939 }
940
941
942 #else /* !WORK_THREAD follows */
943 char work_thread_nonempty_compilation_unit;
944 #endif
945