1 /* $NetBSD: work_thread.c,v 1.9 2024/08/18 20:47:13 christos Exp $ */
2
3 /*
4 * work_thread.c - threads implementation for blocking worker child.
5 */
6 #include <config.h>
7 #include "ntp_workimpl.h"
8
9 #ifdef WORK_THREAD
10
11 #include <stdio.h>
12 #include <ctype.h>
13 #include <signal.h>
14 #ifndef SYS_WINNT
15 #include <pthread.h>
16 #endif
17
18 #include "ntp_stdlib.h"
19 #include "ntp_malloc.h"
20 #include "ntp_syslog.h"
21 #include "ntpd.h"
22 #include "ntp_io.h"
23 #include "ntp_assert.h"
24 #include "ntp_unixtime.h"
25 #include "timespecops.h"
26 #include "ntp_worker.h"
27
28 #define CHILD_EXIT_REQ ((blocking_pipe_header *)(intptr_t)-1)
29 #define CHILD_GONE_RESP CHILD_EXIT_REQ
30 /* Queue size increments:
31 * The request queue grows a bit faster than the response queue -- the
32 * daemon can push requests and pull results faster on avarage than the
33 * worker can process requests and push results... If this really pays
34 * off is debatable.
35 */
36 #define WORKITEMS_ALLOC_INC 16
37 #define RESPONSES_ALLOC_INC 4
38
39 /* Fiddle with min/max stack sizes. 64kB minimum seems to work, so we
40 * set the maximum to 256kB. If the minimum goes below the
41 * system-defined minimum stack size, we have to adjust accordingly.
42 */
43 #ifndef THREAD_MINSTACKSIZE
44 # define THREAD_MINSTACKSIZE (64U * 1024)
45 #endif
46
47 #ifndef THREAD_MAXSTACKSIZE
48 # define THREAD_MAXSTACKSIZE (256U * 1024)
49 #endif
50
51 /* need a good integer to store a pointer... */
52 #ifndef UINTPTR_T
53 # if defined(UINTPTR_MAX)
54 # define UINTPTR_T uintptr_t
55 # elif defined(UINT_PTR)
56 # define UINTPTR_T UINT_PTR
57 # else
58 # define UINTPTR_T size_t
59 # endif
60 #endif
61
62
63 #ifdef SYS_WINNT
64
65 # define thread_exit(c) _endthreadex(c)
66 # define tickle_sem(sh) ReleaseSemaphore((sh->shnd), 1, NULL)
67 u_int WINAPI blocking_thread(void *);
68 static BOOL same_os_sema(const sem_ref obj, void * osobj);
69
70 #else
71
72 # define thread_exit(c) pthread_exit((void*)(UINTPTR_T)(c))
73 # define tickle_sem sem_post
74 void * blocking_thread(void *);
75 static void block_thread_signals(sigset_t *);
76
77 #endif
78
79 #ifdef WORK_PIPE
80 addremove_io_fd_func addremove_io_fd;
81 #else
82 addremove_io_semaphore_func addremove_io_semaphore;
83 #endif
84
85 static void start_blocking_thread(blocking_child *);
86 static void start_blocking_thread_internal(blocking_child *);
87 static void prepare_child_sems(blocking_child *);
88 static int wait_for_sem(sem_ref, struct timespec *);
89 static int ensure_workitems_empty_slot(blocking_child *);
90 static int ensure_workresp_empty_slot(blocking_child *);
91 static int queue_req_pointer(blocking_child *, blocking_pipe_header *);
92 static void cleanup_after_child(blocking_child *);
93
94 static sema_type worker_mmutex;
95 static sem_ref worker_memlock;
96
97 /* --------------------------------------------------------------------
98 * locking the global worker state table (and other global stuff)
99 */
100 void
worker_global_lock(int inOrOut)101 worker_global_lock(
102 int inOrOut)
103 {
104 if (worker_memlock) {
105 if (inOrOut)
106 wait_for_sem(worker_memlock, NULL);
107 else
108 tickle_sem(worker_memlock);
109 }
110 }
111
112 /* --------------------------------------------------------------------
113 * implementation isolation wrapper
114 */
115 void
exit_worker(int exitcode)116 exit_worker(
117 int exitcode
118 )
119 {
120 thread_exit(exitcode); /* see #define thread_exit */
121 }
122
123 /* --------------------------------------------------------------------
124 * sleep for a given time or until the wakup semaphore is tickled.
125 */
126 int
worker_sleep(blocking_child * c,time_t seconds)127 worker_sleep(
128 blocking_child * c,
129 time_t seconds
130 )
131 {
132 struct timespec until;
133 int rc;
134
135 # ifdef HAVE_CLOCK_GETTIME
136 if (0 != clock_gettime(CLOCK_REALTIME, &until)) {
137 msyslog(LOG_ERR, "worker_sleep: clock_gettime() failed: %m");
138 return -1;
139 }
140 # else
141 if (0 != getclock(TIMEOFDAY, &until)) {
142 msyslog(LOG_ERR, "worker_sleep: getclock() failed: %m");
143 return -1;
144 }
145 # endif
146 until.tv_sec += seconds;
147 rc = wait_for_sem(c->wake_scheduled_sleep, &until);
148 if (0 == rc)
149 return -1;
150 if (-1 == rc && ETIMEDOUT == errno)
151 return 0;
152 msyslog(LOG_ERR, "worker_sleep: sem_timedwait: %m");
153 return -1;
154 }
155
156
157 /* --------------------------------------------------------------------
158 * Wake up a worker that takes a nap.
159 */
160 void
interrupt_worker_sleep(void)161 interrupt_worker_sleep(void)
162 {
163 u_int idx;
164 blocking_child * c;
165
166 for (idx = 0; idx < blocking_children_alloc; idx++) {
167 c = blocking_children[idx];
168 if (NULL == c || NULL == c->wake_scheduled_sleep)
169 continue;
170 tickle_sem(c->wake_scheduled_sleep);
171 }
172 }
173
174 /* --------------------------------------------------------------------
175 * Make sure there is an empty slot at the head of the request
176 * queue. Tell if the queue is currently empty.
177 */
178 static int
ensure_workitems_empty_slot(blocking_child * c)179 ensure_workitems_empty_slot(
180 blocking_child *c
181 )
182 {
183 /*
184 ** !!! PRECONDITION: caller holds access lock!
185 **
186 ** This simply tries to increase the size of the buffer if it
187 ** becomes full. The resize operation does *not* maintain the
188 ** order of requests, but that should be irrelevant since the
189 ** processing is considered asynchronous anyway.
190 **
191 ** Return if the buffer is currently empty.
192 */
193
194 static const size_t each =
195 sizeof(blocking_children[0]->workitems[0]);
196
197 size_t new_alloc;
198 size_t slots_used;
199 size_t sidx;
200
201 slots_used = c->head_workitem - c->tail_workitem;
202 if (slots_used >= c->workitems_alloc) {
203 new_alloc = c->workitems_alloc + WORKITEMS_ALLOC_INC;
204 c->workitems = erealloc(c->workitems, new_alloc * each);
205 for (sidx = c->workitems_alloc; sidx < new_alloc; ++sidx)
206 c->workitems[sidx] = NULL;
207 c->tail_workitem = 0;
208 c->head_workitem = c->workitems_alloc;
209 c->workitems_alloc = new_alloc;
210 }
211 INSIST(NULL == c->workitems[c->head_workitem % c->workitems_alloc]);
212 return (0 == slots_used);
213 }
214
215 /* --------------------------------------------------------------------
216 * Make sure there is an empty slot at the head of the response
217 * queue. Tell if the queue is currently empty.
218 */
219 static int
ensure_workresp_empty_slot(blocking_child * c)220 ensure_workresp_empty_slot(
221 blocking_child *c
222 )
223 {
224 /*
225 ** !!! PRECONDITION: caller holds access lock!
226 **
227 ** Works like the companion function above.
228 */
229
230 static const size_t each =
231 sizeof(blocking_children[0]->responses[0]);
232
233 size_t new_alloc;
234 size_t slots_used;
235 size_t sidx;
236
237 slots_used = c->head_response - c->tail_response;
238 if (slots_used >= c->responses_alloc) {
239 new_alloc = c->responses_alloc + RESPONSES_ALLOC_INC;
240 c->responses = erealloc(c->responses, new_alloc * each);
241 for (sidx = c->responses_alloc; sidx < new_alloc; ++sidx)
242 c->responses[sidx] = NULL;
243 c->tail_response = 0;
244 c->head_response = c->responses_alloc;
245 c->responses_alloc = new_alloc;
246 }
247 INSIST(NULL == c->responses[c->head_response % c->responses_alloc]);
248 return (0 == slots_used);
249 }
250
251
252 /* --------------------------------------------------------------------
253 * queue_req_pointer() - append a work item or idle exit request to
254 * blocking_workitems[]. Employ proper locking.
255 */
256 static int
queue_req_pointer(blocking_child * c,blocking_pipe_header * hdr)257 queue_req_pointer(
258 blocking_child * c,
259 blocking_pipe_header * hdr
260 )
261 {
262 size_t qhead;
263
264 /* >>>> ACCESS LOCKING STARTS >>>> */
265 wait_for_sem(c->accesslock, NULL);
266 ensure_workitems_empty_slot(c);
267 qhead = c->head_workitem;
268 c->workitems[qhead % c->workitems_alloc] = hdr;
269 c->head_workitem = 1 + qhead;
270 tickle_sem(c->accesslock);
271 /* <<<< ACCESS LOCKING ENDS <<<< */
272
273 /* queue consumer wake-up notification */
274 tickle_sem(c->workitems_pending);
275
276 return 0;
277 }
278
279 /* --------------------------------------------------------------------
280 * API function to make sure a worker is running, a proper private copy
281 * of the data is made, the data eneterd into the queue and the worker
282 * is signalled.
283 */
284 int
send_blocking_req_internal(blocking_child * c,blocking_pipe_header * hdr,void * data)285 send_blocking_req_internal(
286 blocking_child * c,
287 blocking_pipe_header * hdr,
288 void * data
289 )
290 {
291 blocking_pipe_header * threadcopy;
292 size_t payload_octets;
293
294 REQUIRE(hdr != NULL);
295 REQUIRE(data != NULL);
296 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == hdr->magic_sig);
297
298 if (hdr->octets <= sizeof(*hdr))
299 return 1; /* failure */
300 payload_octets = hdr->octets - sizeof(*hdr);
301
302 if (NULL == c->thread_ref)
303 start_blocking_thread(c);
304 threadcopy = emalloc(hdr->octets);
305 memcpy(threadcopy, hdr, sizeof(*hdr));
306 memcpy((char *)threadcopy + sizeof(*hdr), data, payload_octets);
307
308 return queue_req_pointer(c, threadcopy);
309 }
310
311 /* --------------------------------------------------------------------
312 * Wait for the 'incoming queue no longer empty' signal, lock the shared
313 * structure and dequeue an item.
314 */
315 blocking_pipe_header *
receive_blocking_req_internal(blocking_child * c)316 receive_blocking_req_internal(
317 blocking_child * c
318 )
319 {
320 blocking_pipe_header * req;
321 size_t qhead, qtail;
322
323 req = NULL;
324 do {
325 /* wait for tickle from the producer side */
326 wait_for_sem(c->workitems_pending, NULL);
327
328 /* >>>> ACCESS LOCKING STARTS >>>> */
329 wait_for_sem(c->accesslock, NULL);
330 qhead = c->head_workitem;
331 do {
332 qtail = c->tail_workitem;
333 if (qhead == qtail)
334 break;
335 c->tail_workitem = qtail + 1;
336 qtail %= c->workitems_alloc;
337 req = c->workitems[qtail];
338 c->workitems[qtail] = NULL;
339 } while (NULL == req);
340 tickle_sem(c->accesslock);
341 /* <<<< ACCESS LOCKING ENDS <<<< */
342
343 } while (NULL == req);
344
345 INSIST(NULL != req);
346 if (CHILD_EXIT_REQ == req) { /* idled out */
347 send_blocking_resp_internal(c, CHILD_GONE_RESP);
348 req = NULL;
349 }
350
351 return req;
352 }
353
354 /* --------------------------------------------------------------------
355 * Push a response into the return queue and eventually tickle the
356 * receiver.
357 */
358 int
send_blocking_resp_internal(blocking_child * c,blocking_pipe_header * resp)359 send_blocking_resp_internal(
360 blocking_child * c,
361 blocking_pipe_header * resp
362 )
363 {
364 size_t qhead;
365 int empty;
366
367 /* >>>> ACCESS LOCKING STARTS >>>> */
368 wait_for_sem(c->accesslock, NULL);
369 empty = ensure_workresp_empty_slot(c);
370 qhead = c->head_response;
371 c->responses[qhead % c->responses_alloc] = resp;
372 c->head_response = 1 + qhead;
373 tickle_sem(c->accesslock);
374 /* <<<< ACCESS LOCKING ENDS <<<< */
375
376 /* queue consumer wake-up notification */
377 if (empty)
378 {
379 # ifdef WORK_PIPE
380 if (1 != write(c->resp_write_pipe, "", 1))
381 msyslog(LOG_WARNING, "async resolver: blocking_get%sinfo"
382 " failed to notify main thread!",
383 (BLOCKING_GETNAMEINFO == resp->rtype)
384 ? "name"
385 : "addr"
386 );
387 # else
388 tickle_sem(c->responses_pending);
389 # endif
390 }
391 return 0;
392 }
393
394
395 #ifndef WORK_PIPE
396
397 /* --------------------------------------------------------------------
398 * Check if a (Windows-)handle to a semaphore is actually the same we
399 * are using inside the sema wrapper.
400 */
401 static BOOL
same_os_sema(const sem_ref obj,void * osh)402 same_os_sema(
403 const sem_ref obj,
404 void* osh
405 )
406 {
407 return obj && osh && (obj->shnd == (HANDLE)osh);
408 }
409
410 /* --------------------------------------------------------------------
411 * Find the shared context that associates to an OS handle and make sure
412 * the data is dequeued and processed.
413 */
414 void
handle_blocking_resp_sem(void * context)415 handle_blocking_resp_sem(
416 void * context
417 )
418 {
419 blocking_child * c;
420 u_int idx;
421
422 c = NULL;
423 for (idx = 0; idx < blocking_children_alloc; idx++) {
424 c = blocking_children[idx];
425 if (c != NULL &&
426 c->thread_ref != NULL &&
427 same_os_sema(c->responses_pending, context))
428 break;
429 }
430 if (idx < blocking_children_alloc)
431 process_blocking_resp(c);
432 }
433 #endif /* !WORK_PIPE */
434
435 /* --------------------------------------------------------------------
436 * Fetch the next response from the return queue. In case of signalling
437 * via pipe, make sure the pipe is flushed, too.
438 */
439 blocking_pipe_header *
receive_blocking_resp_internal(blocking_child * c)440 receive_blocking_resp_internal(
441 blocking_child * c
442 )
443 {
444 blocking_pipe_header * removed;
445 size_t qhead, qtail, slot;
446
447 #ifdef WORK_PIPE
448 int rc;
449 char scratch[32];
450
451 do
452 rc = read(c->resp_read_pipe, scratch, sizeof(scratch));
453 while (-1 == rc && EINTR == errno);
454 #endif
455
456 /* >>>> ACCESS LOCKING STARTS >>>> */
457 wait_for_sem(c->accesslock, NULL);
458 qhead = c->head_response;
459 qtail = c->tail_response;
460 for (removed = NULL; !removed && (qhead != qtail); ++qtail) {
461 slot = qtail % c->responses_alloc;
462 removed = c->responses[slot];
463 c->responses[slot] = NULL;
464 }
465 c->tail_response = qtail;
466 tickle_sem(c->accesslock);
467 /* <<<< ACCESS LOCKING ENDS <<<< */
468
469 if (NULL != removed) {
470 DEBUG_ENSURE(CHILD_GONE_RESP == removed ||
471 BLOCKING_RESP_MAGIC == removed->magic_sig);
472 }
473 if (CHILD_GONE_RESP == removed) {
474 cleanup_after_child(c);
475 removed = NULL;
476 }
477
478 return removed;
479 }
480
481 /* --------------------------------------------------------------------
482 * Light up a new worker.
483 */
484 static void
start_blocking_thread(blocking_child * c)485 start_blocking_thread(
486 blocking_child * c
487 )
488 {
489
490 DEBUG_INSIST(!c->reusable);
491
492 prepare_child_sems(c);
493 start_blocking_thread_internal(c);
494 }
495
496 /* --------------------------------------------------------------------
497 * Create a worker thread. There are several differences between POSIX
498 * and Windows, of course -- most notably the Windows thread is a
499 * detached thread, and we keep the handle around until we want to get
500 * rid of the thread. The notification scheme also differs: Windows
501 * makes use of semaphores in both directions, POSIX uses a pipe for
502 * integration with 'select()' or alike.
503 */
504 static void
start_blocking_thread_internal(blocking_child * c)505 start_blocking_thread_internal(
506 blocking_child * c
507 )
508 #ifdef SYS_WINNT
509 {
510 BOOL resumed;
511
512 c->thread_ref = NULL;
513 (*addremove_io_semaphore)(c->responses_pending->shnd, FALSE);
514 c->thr_table[0].thnd =
515 (HANDLE)_beginthreadex(
516 NULL,
517 0,
518 &blocking_thread,
519 c,
520 CREATE_SUSPENDED,
521 NULL);
522
523 if (NULL == c->thr_table[0].thnd) {
524 msyslog(LOG_ERR, "start blocking thread failed: %m");
525 exit(-1);
526 }
527 /* remember the thread priority is only within the process class */
528 if (!SetThreadPriority(c->thr_table[0].thnd,
529 THREAD_PRIORITY_BELOW_NORMAL)) {
530 msyslog(LOG_ERR, "Error lowering blocking thread priority: %m");
531 }
532 if (NULL != pSetThreadDescription) {
533 (*pSetThreadDescription)(c->thr_table[0].thnd, L"ntp_worker");
534 }
535 resumed = ResumeThread(c->thr_table[0].thnd);
536 DEBUG_INSIST(resumed);
537 c->thread_ref = &c->thr_table[0];
538 }
539 #else /* pthreads start_blocking_thread_internal() follows */
540 {
541 # ifdef NEED_PTHREAD_INIT
542 static int pthread_init_called;
543 # endif
544 pthread_attr_t thr_attr;
545 int rc;
546 int pipe_ends[2]; /* read then write */
547 int is_pipe;
548 int flags;
549 size_t ostacksize;
550 size_t nstacksize;
551 sigset_t saved_sig_mask;
552
553 c->thread_ref = NULL;
554
555 # ifdef NEED_PTHREAD_INIT
556 /*
557 * from lib/isc/unix/app.c:
558 * BSDI 3.1 seg faults in pthread_sigmask() if we don't do this.
559 */
560 if (!pthread_init_called) {
561 pthread_init();
562 pthread_init_called = TRUE;
563 }
564 # endif
565
566 rc = pipe_socketpair(&pipe_ends[0], &is_pipe);
567 if (0 != rc) {
568 msyslog(LOG_ERR, "start_blocking_thread: pipe_socketpair() %m");
569 exit(1);
570 }
571 c->resp_read_pipe = move_fd(pipe_ends[0]);
572 c->resp_write_pipe = move_fd(pipe_ends[1]);
573 c->ispipe = is_pipe;
574 flags = fcntl(c->resp_read_pipe, F_GETFL, 0);
575 if (-1 == flags) {
576 msyslog(LOG_ERR, "start_blocking_thread: fcntl(F_GETFL) %m");
577 exit(1);
578 }
579 rc = fcntl(c->resp_read_pipe, F_SETFL, O_NONBLOCK | flags);
580 if (-1 == rc) {
581 msyslog(LOG_ERR,
582 "start_blocking_thread: fcntl(F_SETFL, O_NONBLOCK) %m");
583 exit(1);
584 }
585 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, FALSE);
586 pthread_attr_init(&thr_attr);
587 pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED);
588 #if defined(HAVE_PTHREAD_ATTR_GETSTACKSIZE) && \
589 defined(HAVE_PTHREAD_ATTR_SETSTACKSIZE)
590 rc = pthread_attr_getstacksize(&thr_attr, &ostacksize);
591 if (0 != rc) {
592 msyslog(LOG_ERR,
593 "start_blocking_thread: pthread_attr_getstacksize() -> %s",
594 strerror(rc));
595 } else {
596 nstacksize = ostacksize;
597 /* order is important here: first clamp on upper limit,
598 * and the PTHREAD min stack size is ultimate override!
599 */
600 if (nstacksize > THREAD_MAXSTACKSIZE)
601 nstacksize = THREAD_MAXSTACKSIZE;
602 # ifdef PTHREAD_STACK_MAX
603 if (nstacksize > PTHREAD_STACK_MAX)
604 nstacksize = PTHREAD_STACK_MAX;
605 # endif
606
607 /* now clamp on lower stack limit. */
608 if (nstacksize < THREAD_MINSTACKSIZE)
609 nstacksize = THREAD_MINSTACKSIZE;
610 # ifdef PTHREAD_STACK_MIN
611 if (nstacksize < PTHREAD_STACK_MIN)
612 nstacksize = PTHREAD_STACK_MIN;
613 # endif
614
615 if (nstacksize != ostacksize)
616 rc = pthread_attr_setstacksize(&thr_attr, nstacksize);
617 if (0 != rc)
618 msyslog(LOG_ERR,
619 "start_blocking_thread: pthread_attr_setstacksize(0x%lx -> 0x%lx) -> %s",
620 (u_long)ostacksize, (u_long)nstacksize,
621 strerror(rc));
622 }
623 #else
624 UNUSED_ARG(nstacksize);
625 UNUSED_ARG(ostacksize);
626 #endif
627 #if defined(PTHREAD_SCOPE_SYSTEM) && defined(NEED_PTHREAD_SCOPE_SYSTEM)
628 pthread_attr_setscope(&thr_attr, PTHREAD_SCOPE_SYSTEM);
629 #endif
630 c->thread_ref = emalloc_zero(sizeof(*c->thread_ref));
631 block_thread_signals(&saved_sig_mask);
632 rc = pthread_create(&c->thr_table[0], &thr_attr,
633 &blocking_thread, c);
634 pthread_sigmask(SIG_SETMASK, &saved_sig_mask, NULL);
635 pthread_attr_destroy(&thr_attr);
636 if (0 != rc) {
637 msyslog(LOG_ERR, "start_blocking_thread: pthread_create() -> %s",
638 strerror(rc));
639 exit(1);
640 }
641 c->thread_ref = &c->thr_table[0];
642 }
643 #endif
644
645 /* --------------------------------------------------------------------
646 * block_thread_signals()
647 *
648 * Temporarily block signals used by ntpd main thread, so that signal
649 * mask inherited by child threads leaves them blocked. Returns prior
650 * active signal mask via pmask, to be restored by the main thread
651 * after pthread_create().
652 */
653 #ifndef SYS_WINNT
654 void
block_thread_signals(sigset_t * pmask)655 block_thread_signals(
656 sigset_t * pmask
657 )
658 {
659 sigset_t block;
660
661 sigemptyset(&block);
662 # ifdef HAVE_SIGNALED_IO
663 # ifdef SIGIO
664 sigaddset(&block, SIGIO);
665 # endif
666 # ifdef SIGPOLL
667 sigaddset(&block, SIGPOLL);
668 # endif
669 # endif /* HAVE_SIGNALED_IO */
670 sigaddset(&block, SIGALRM);
671 sigaddset(&block, MOREDEBUGSIG);
672 sigaddset(&block, LESSDEBUGSIG);
673 # ifdef SIGDIE1
674 sigaddset(&block, SIGDIE1);
675 # endif
676 # ifdef SIGDIE2
677 sigaddset(&block, SIGDIE2);
678 # endif
679 # ifdef SIGDIE3
680 sigaddset(&block, SIGDIE3);
681 # endif
682 # ifdef SIGDIE4
683 sigaddset(&block, SIGDIE4);
684 # endif
685 # ifdef SIGBUS
686 sigaddset(&block, SIGBUS);
687 # endif
688 sigemptyset(pmask);
689 pthread_sigmask(SIG_BLOCK, &block, pmask);
690 }
691 #endif /* !SYS_WINNT */
692
693
694 /* --------------------------------------------------------------------
695 * Create & destroy semaphores. This is sufficiently different between
696 * POSIX and Windows to warrant wrapper functions and close enough to
697 * use the concept of synchronization via semaphore for all platforms.
698 */
699 static sem_ref
create_sema(sema_type * semptr,u_int inival,u_int maxval)700 create_sema(
701 sema_type* semptr,
702 u_int inival,
703 u_int maxval)
704 {
705 #ifdef SYS_WINNT
706
707 long svini, svmax;
708 if (NULL != semptr) {
709 svini = (inival < LONG_MAX)
710 ? (long)inival : LONG_MAX;
711 svmax = (maxval < LONG_MAX && maxval > 0)
712 ? (long)maxval : LONG_MAX;
713 semptr->shnd = CreateSemaphore(NULL, svini, svmax, NULL);
714 if (NULL == semptr->shnd)
715 semptr = NULL;
716 }
717
718 #else
719
720 (void)maxval;
721 if (semptr && sem_init(semptr, FALSE, inival))
722 semptr = NULL;
723
724 #endif
725
726 return semptr;
727 }
728
729 /* ------------------------------------------------------------------ */
730 static sem_ref
delete_sema(sem_ref obj)731 delete_sema(
732 sem_ref obj)
733 {
734
735 # ifdef SYS_WINNT
736
737 if (obj) {
738 if (obj->shnd)
739 CloseHandle(obj->shnd);
740 obj->shnd = NULL;
741 }
742
743 # else
744
745 if (obj)
746 sem_destroy(obj);
747
748 # endif
749
750 return NULL;
751 }
752
753 /* --------------------------------------------------------------------
754 * prepare_child_sems()
755 *
756 * create sync & access semaphores
757 *
758 * All semaphores are cleared, only the access semaphore has 1 unit.
759 * Childs wait on 'workitems_pending', then grabs 'sema_access'
760 * and dequeues jobs. When done, 'sema_access' is given one unit back.
761 *
762 * The producer grabs 'sema_access', manages the queue, restores
763 * 'sema_access' and puts one unit into 'workitems_pending'.
764 *
765 * The story goes the same for the response queue.
766 */
767 static void
prepare_child_sems(blocking_child * c)768 prepare_child_sems(
769 blocking_child *c
770 )
771 {
772 if (NULL == worker_memlock)
773 worker_memlock = create_sema(&worker_mmutex, 1, 1);
774
775 c->accesslock = create_sema(&c->sem_table[0], 1, 1);
776 c->workitems_pending = create_sema(&c->sem_table[1], 0, 0);
777 c->wake_scheduled_sleep = create_sema(&c->sem_table[2], 0, 1);
778 # ifndef WORK_PIPE
779 c->responses_pending = create_sema(&c->sem_table[3], 0, 0);
780 # endif
781 }
782
783 /* --------------------------------------------------------------------
784 * wait for semaphore. Where the wait can be interrupted, it will
785 * internally resume -- When this function returns, there is either no
786 * semaphore at all, a timeout occurred, or the caller could
787 * successfully take a token from the semaphore.
788 *
789 * For untimed wait, not checking the result of this function at all is
790 * definitely an option.
791 */
792 static int
wait_for_sem(sem_ref sem,struct timespec * timeout)793 wait_for_sem(
794 sem_ref sem,
795 struct timespec * timeout /* wall-clock */
796 )
797 #ifdef SYS_WINNT
798 {
799 struct timespec now;
800 struct timespec delta;
801 DWORD msec;
802 DWORD rc;
803
804 if (!(sem && sem->shnd)) {
805 errno = EINVAL;
806 return -1;
807 }
808
809 if (NULL == timeout) {
810 msec = INFINITE;
811 } else {
812 getclock(TIMEOFDAY, &now);
813 delta = sub_tspec(*timeout, now);
814 if (delta.tv_sec < 0) {
815 msec = 0;
816 } else if ((delta.tv_sec + 1) >= (MAXDWORD / 1000)) {
817 msec = INFINITE;
818 } else {
819 msec = 1000 * (DWORD)delta.tv_sec;
820 msec += delta.tv_nsec / (1000 * 1000);
821 }
822 }
823 rc = WaitForSingleObject(sem->shnd, msec);
824 if (WAIT_OBJECT_0 == rc)
825 return 0;
826 if (WAIT_TIMEOUT == rc) {
827 errno = ETIMEDOUT;
828 return -1;
829 }
830 msyslog(LOG_ERR, "WaitForSingleObject unexpected 0x%x", rc);
831 errno = EFAULT;
832 return -1;
833 }
834 #else /* pthreads wait_for_sem() follows */
835 {
836 int rc = -1;
837
838 if (sem) do {
839 if (NULL == timeout)
840 rc = sem_wait(sem);
841 else
842 rc = sem_timedwait(sem, timeout);
843 } while (rc == -1 && errno == EINTR);
844 else
845 errno = EINVAL;
846
847 return rc;
848 }
849 #endif
850
851 /* --------------------------------------------------------------------
852 * blocking_thread - thread functions have WINAPI (aka 'stdcall')
853 * calling conventions under Windows and POSIX-defined signature
854 * otherwise.
855 */
856 #ifdef SYS_WINNT
857 u_int WINAPI
858 #else
859 void *
860 #endif
blocking_thread(void * ThreadArg)861 blocking_thread(
862 void * ThreadArg
863 )
864 {
865 blocking_child *c;
866
867 c = ThreadArg;
868 exit_worker(blocking_child_common(c));
869
870 /* NOTREACHED */
871 return 0;
872 }
873
874 /* --------------------------------------------------------------------
875 * req_child_exit() runs in the parent.
876 *
877 * This function is called from from the idle timer, too, and possibly
878 * without a thread being there any longer. Since we have folded up our
879 * tent in that case and all the semaphores are already gone, we simply
880 * ignore this request in this case.
881 *
882 * Since the existence of the semaphores is controlled exclusively by
883 * the parent, there's no risk of data race here.
884 */
885 int
req_child_exit(blocking_child * c)886 req_child_exit(
887 blocking_child *c
888 )
889 {
890 return (c->accesslock)
891 ? queue_req_pointer(c, CHILD_EXIT_REQ)
892 : 0;
893 }
894
895 /* --------------------------------------------------------------------
896 * cleanup_after_child() runs in parent.
897 */
898 static void
cleanup_after_child(blocking_child * c)899 cleanup_after_child(
900 blocking_child * c
901 )
902 {
903 DEBUG_INSIST(!c->reusable);
904
905 # ifdef SYS_WINNT
906 /* The thread was not created in detached state, so we better
907 * clean up.
908 */
909 if (c->thread_ref && c->thread_ref->thnd) {
910 WaitForSingleObject(c->thread_ref->thnd, INFINITE);
911 INSIST(CloseHandle(c->thread_ref->thnd));
912 c->thread_ref->thnd = NULL;
913 }
914 # endif
915 c->thread_ref = NULL;
916
917 /* remove semaphores and (if signalling vi IO) pipes */
918
919 c->accesslock = delete_sema(c->accesslock);
920 c->workitems_pending = delete_sema(c->workitems_pending);
921 c->wake_scheduled_sleep = delete_sema(c->wake_scheduled_sleep);
922
923 # ifdef WORK_PIPE
924 DEBUG_INSIST(-1 != c->resp_read_pipe);
925 DEBUG_INSIST(-1 != c->resp_write_pipe);
926 (*addremove_io_fd)(c->resp_read_pipe, c->ispipe, TRUE);
927 close(c->resp_write_pipe);
928 close(c->resp_read_pipe);
929 c->resp_write_pipe = -1;
930 c->resp_read_pipe = -1;
931 # else
932 DEBUG_INSIST(NULL != c->responses_pending);
933 (*addremove_io_semaphore)(c->responses_pending->shnd, TRUE);
934 c->responses_pending = delete_sema(c->responses_pending);
935 # endif
936
937 /* Is it necessary to check if there are pending requests and
938 * responses? If so, and if there are, what to do with them?
939 */
940
941 /* re-init buffer index sequencers */
942 c->head_workitem = 0;
943 c->tail_workitem = 0;
944 c->head_response = 0;
945 c->tail_response = 0;
946
947 c->reusable = TRUE;
948 }
949
950
951 #else /* !WORK_THREAD follows */
952 char work_thread_nonempty_compilation_unit;
953 #endif
954