1 /*
2 * ntp_worker.c
3 */
4 #include <config.h>
5 #include "ntp_workimpl.h"
6
7 #ifdef WORKER
8
9 #include <stdio.h>
10 #include <ctype.h>
11 #include <signal.h>
12
13 #include "iosignal.h"
14 #include "ntp_stdlib.h"
15 #include "ntp_malloc.h"
16 #include "ntp_syslog.h"
17 #include "ntpd.h"
18 #include "ntp_io.h"
19 #include "ntp_assert.h"
20 #include "ntp_unixtime.h"
21 #include "intreswork.h"
22
23
24 #define CHILD_MAX_IDLE (3 * 60) /* seconds, idle worker limit */
25
26 blocking_child ** blocking_children;
27 size_t blocking_children_alloc;
28 int worker_per_query; /* boolean */
29 int intres_req_pending;
30
31
32 #ifndef HAVE_IO_COMPLETION_PORT
33 /*
34 * pipe_socketpair()
35 *
36 * Provides an AF_UNIX socketpair on systems which have them, otherwise
37 * pair of unidirectional pipes.
38 */
39 int
pipe_socketpair(int caller_fds[2],int * is_pipe)40 pipe_socketpair(
41 int caller_fds[2],
42 int * is_pipe
43 )
44 {
45 int rc;
46 int fds[2];
47 int called_pipe;
48
49 #ifdef HAVE_SOCKETPAIR
50 rc = socketpair(AF_UNIX, SOCK_STREAM, 0, &fds[0]);
51 #else
52 rc = -1;
53 #endif
54
55 if (-1 == rc) {
56 rc = pipe(&fds[0]);
57 called_pipe = TRUE;
58 } else {
59 called_pipe = FALSE;
60 }
61
62 if (-1 == rc)
63 return rc;
64
65 caller_fds[0] = fds[0];
66 caller_fds[1] = fds[1];
67 if (is_pipe != NULL)
68 *is_pipe = called_pipe;
69
70 return 0;
71 }
72
73
74 /*
75 * close_all_except()
76 *
77 * Close all file descriptors except the given keep_fd.
78 */
79 void
close_all_except(int keep_fd)80 close_all_except(
81 int keep_fd
82 )
83 {
84 int fd;
85
86 for (fd = 0; fd < keep_fd; fd++)
87 close(fd);
88
89 close_all_beyond(keep_fd);
90 }
91
92
93 /*
94 * close_all_beyond()
95 *
96 * Close all file descriptors after the given keep_fd, which is the
97 * highest fd to keep open.
98 */
99 void
close_all_beyond(int keep_fd)100 close_all_beyond(
101 int keep_fd
102 )
103 {
104 # ifdef HAVE_CLOSEFROM
105 closefrom(keep_fd + 1);
106 # elif defined(F_CLOSEM)
107 /*
108 * From 'Writing Reliable AIX Daemons,' SG24-4946-00,
109 * by Eric Agar (saves us from doing 32767 system
110 * calls)
111 */
112 if (fcntl(keep_fd + 1, F_CLOSEM, 0) == -1)
113 msyslog(LOG_ERR, "F_CLOSEM(%d): %m", keep_fd + 1);
114 # else /* !HAVE_CLOSEFROM && !F_CLOSEM follows */
115 int fd;
116 int max_fd;
117
118 max_fd = GETDTABLESIZE();
119 for (fd = keep_fd + 1; fd < max_fd; fd++)
120 close(fd);
121 # endif /* !HAVE_CLOSEFROM && !F_CLOSEM */
122 }
123 #endif /* HAVE_IO_COMPLETION_PORT */
124
125
126 u_int
available_blocking_child_slot(void)127 available_blocking_child_slot(void)
128 {
129 const size_t each = sizeof(blocking_children[0]);
130 u_int slot;
131 size_t prev_alloc;
132 size_t new_alloc;
133 size_t prev_octets;
134 size_t octets;
135
136 for (slot = 0; slot < blocking_children_alloc; slot++) {
137 if (NULL == blocking_children[slot])
138 return slot;
139 if (blocking_children[slot]->reusable) {
140 blocking_children[slot]->reusable = FALSE;
141 return slot;
142 }
143 }
144
145 prev_alloc = blocking_children_alloc;
146 prev_octets = prev_alloc * each;
147 new_alloc = blocking_children_alloc + 4;
148 octets = new_alloc * each;
149 blocking_children = erealloc_zero(blocking_children, octets,
150 prev_octets);
151 blocking_children_alloc = new_alloc;
152
153 /* assume we'll never have enough workers to overflow u_int */
154 return (u_int)prev_alloc;
155 }
156
157
158 int
queue_blocking_request(blocking_work_req rtype,void * req,size_t reqsize,blocking_work_callback done_func,void * context)159 queue_blocking_request(
160 blocking_work_req rtype,
161 void * req,
162 size_t reqsize,
163 blocking_work_callback done_func,
164 void * context
165 )
166 {
167 static u_int intres_slot = UINT_MAX;
168 u_int child_slot;
169 blocking_child * c;
170 blocking_pipe_header req_hdr;
171
172 req_hdr.octets = sizeof(req_hdr) + reqsize;
173 req_hdr.magic_sig = BLOCKING_REQ_MAGIC;
174 req_hdr.rtype = rtype;
175 req_hdr.done_func = done_func;
176 req_hdr.context = context;
177
178 child_slot = UINT_MAX;
179 if (worker_per_query || UINT_MAX == intres_slot ||
180 blocking_children[intres_slot]->reusable)
181 child_slot = available_blocking_child_slot();
182 if (!worker_per_query) {
183 if (UINT_MAX == intres_slot)
184 intres_slot = child_slot;
185 else
186 child_slot = intres_slot;
187 if (0 == intres_req_pending)
188 intres_timeout_req(0);
189 }
190 intres_req_pending++;
191 INSIST(UINT_MAX != child_slot);
192 c = blocking_children[child_slot];
193 if (NULL == c) {
194 c = emalloc_zero(sizeof(*c));
195 #ifdef WORK_FORK
196 c->req_read_pipe = -1;
197 c->req_write_pipe = -1;
198 #endif
199 #ifdef WORK_PIPE
200 c->resp_read_pipe = -1;
201 c->resp_write_pipe = -1;
202 #endif
203 blocking_children[child_slot] = c;
204 }
205 req_hdr.child_idx = child_slot;
206
207 return send_blocking_req_internal(c, &req_hdr, req);
208 }
209
210
queue_blocking_response(blocking_child * c,blocking_pipe_header * resp,size_t respsize,const blocking_pipe_header * req)211 int queue_blocking_response(
212 blocking_child * c,
213 blocking_pipe_header * resp,
214 size_t respsize,
215 const blocking_pipe_header * req
216 )
217 {
218 resp->octets = respsize;
219 resp->magic_sig = BLOCKING_RESP_MAGIC;
220 resp->rtype = req->rtype;
221 resp->context = req->context;
222 resp->done_func = req->done_func;
223
224 return send_blocking_resp_internal(c, resp);
225 }
226
227
228 void
process_blocking_resp(blocking_child * c)229 process_blocking_resp(
230 blocking_child * c
231 )
232 {
233 blocking_pipe_header * resp;
234 void * data;
235
236 /*
237 * On Windows send_blocking_resp_internal() may signal the
238 * blocking_response_ready event multiple times while we're
239 * processing a response, so always consume all available
240 * responses before returning to test the event again.
241 */
242 #ifdef WORK_THREAD
243 do {
244 #endif
245 resp = receive_blocking_resp_internal(c);
246 if (NULL != resp) {
247 DEBUG_REQUIRE(BLOCKING_RESP_MAGIC ==
248 resp->magic_sig);
249 data = (char *)resp + sizeof(*resp);
250 intres_req_pending--;
251 (*resp->done_func)(resp->rtype, resp->context,
252 resp->octets - sizeof(*resp),
253 data);
254 free(resp);
255 }
256 #ifdef WORK_THREAD
257 } while (NULL != resp);
258 #endif
259 if (!worker_per_query && 0 == intres_req_pending)
260 intres_timeout_req(CHILD_MAX_IDLE);
261 else if (worker_per_query)
262 req_child_exit(c);
263 }
264
265
266 /*
267 * blocking_child_common runs as a forked child or a thread
268 */
269 int
blocking_child_common(blocking_child * c)270 blocking_child_common(
271 blocking_child *c
272 )
273 {
274 int say_bye;
275 blocking_pipe_header *req;
276
277 say_bye = FALSE;
278 while (!say_bye) {
279 req = receive_blocking_req_internal(c);
280 if (NULL == req) {
281 say_bye = TRUE;
282 continue;
283 }
284
285 DEBUG_REQUIRE(BLOCKING_REQ_MAGIC == req->magic_sig);
286
287 switch (req->rtype) {
288 case BLOCKING_GETADDRINFO:
289 if (blocking_getaddrinfo(c, req))
290 say_bye = TRUE;
291 break;
292
293 case BLOCKING_GETNAMEINFO:
294 if (blocking_getnameinfo(c, req))
295 say_bye = TRUE;
296 break;
297
298 default:
299 msyslog(LOG_ERR, "unknown req %d to blocking worker", req->rtype);
300 say_bye = TRUE;
301 }
302
303 free(req);
304 }
305
306 return 0;
307 }
308
309
310 /*
311 * worker_idle_timer_fired()
312 *
313 * The parent starts this timer when the last pending response has been
314 * received from the child, making it idle, and clears the timer when a
315 * request is dispatched to the child. Once the timer expires, the
316 * child is sent packing.
317 *
318 * This is called when worker_idle_timer is nonzero and less than or
319 * equal to current_time.
320 */
321 void
worker_idle_timer_fired(void)322 worker_idle_timer_fired(void)
323 {
324 u_int idx;
325 blocking_child * c;
326
327 DEBUG_REQUIRE(0 == intres_req_pending);
328
329 intres_timeout_req(0);
330 for (idx = 0; idx < blocking_children_alloc; idx++) {
331 c = blocking_children[idx];
332 if (NULL == c)
333 continue;
334 req_child_exit(c);
335 }
336 }
337
338
339 #else /* !WORKER follows */
340 int ntp_worker_nonempty_compilation_unit;
341 #endif
342