xref: /NextBSD/lib/libdispatch/src/source.c (revision 33da5adc555b3bc29986eeadca03829e4ad06b1e)
1 /*
2  * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3  *
4  * @APPLE_APACHE_LICENSE_HEADER_START@
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * @APPLE_APACHE_LICENSE_HEADER_END@
19  */
20 
21 #include "internal.h"
22 #if HAVE_MACH
23 #include "protocol.h"
24 #include "protocolServer.h"
25 #endif
26 #include <sys/mount.h>
27 
28 static void _dispatch_source_merge_kevent(dispatch_source_t ds,
29 		const struct kevent64_s *ke);
30 static bool _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp);
31 static void _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg);
32 static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
33 		uint32_t del_flags);
34 static void _dispatch_kevent_drain(struct kevent64_s *ke);
35 static void _dispatch_kevent_merge(struct kevent64_s *ke);
36 static void _dispatch_timers_kevent(struct kevent64_s *ke);
37 static void _dispatch_timers_unregister(dispatch_source_t ds,
38 		dispatch_kevent_t dk);
39 static void _dispatch_timers_update(dispatch_source_t ds);
40 static void _dispatch_timer_aggregates_check(void);
41 static void _dispatch_timer_aggregates_register(dispatch_source_t ds);
42 static void _dispatch_timer_aggregates_update(dispatch_source_t ds,
43 		unsigned int tidx);
44 static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds,
45 		unsigned int tidx);
46 static inline unsigned long _dispatch_source_timer_data(
47 		dispatch_source_refs_t dr, unsigned long prev);
48 static long _dispatch_kq_update(const struct kevent64_s *);
49 static void _dispatch_memorystatus_init(void);
50 #if HAVE_MACH
51 static void _dispatch_mach_host_calendar_change_register(void);
52 static void _dispatch_mach_recv_msg_buf_init(void);
53 static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
54 		uint32_t new_flags, uint32_t del_flags);
55 static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk,
56 		uint32_t new_flags, uint32_t del_flags);
57 static inline void _dispatch_kevent_mach_portset(struct kevent64_s *ke);
58 #else
_dispatch_mach_host_calendar_change_register(void)59 static inline void _dispatch_mach_host_calendar_change_register(void) {}
_dispatch_mach_recv_msg_buf_init(void)60 static inline void _dispatch_mach_recv_msg_buf_init(void) {}
61 #endif
62 static const char * _evfiltstr(short filt);
63 #if DISPATCH_DEBUG
64 static void _dispatch_kevent_debug(struct kevent64_s* kev, const char* str);
65 static void _dispatch_kevent_debugger(void *context);
66 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
67 	dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
68 #else
69 static inline void
_dispatch_kevent_debug(struct kevent64_s * kev DISPATCH_UNUSED,const char * str DISPATCH_UNUSED)70 _dispatch_kevent_debug(struct kevent64_s* kev DISPATCH_UNUSED,
71 		const char* str DISPATCH_UNUSED) {}
72 #define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
73 #endif
74 
75 #pragma mark -
76 #pragma mark dispatch_source_t
77 
78 dispatch_source_t
dispatch_source_create(dispatch_source_type_t type,uintptr_t handle,unsigned long mask,dispatch_queue_t q)79 dispatch_source_create(dispatch_source_type_t type,
80 	uintptr_t handle,
81 	unsigned long mask,
82 	dispatch_queue_t q)
83 {
84 	const struct kevent64_s *proto_kev = &type->ke;
85 	dispatch_source_t ds;
86 	dispatch_kevent_t dk;
87 
88 	// input validation
89 	if (type == NULL || (mask & ~type->mask)) {
90 		return NULL;
91 	}
92 
93 	switch (type->ke.filter) {
94 	case EVFILT_SIGNAL:
95 		if (handle >= NSIG) {
96 			return NULL;
97 		}
98 		break;
99 	case EVFILT_FS:
100 #if DISPATCH_USE_VM_PRESSURE
101 	case EVFILT_VM:
102 #endif
103 #if DISPATCH_USE_MEMORYSTATUS
104 	case EVFILT_MEMORYSTATUS:
105 #endif
106 	case DISPATCH_EVFILT_CUSTOM_ADD:
107 	case DISPATCH_EVFILT_CUSTOM_OR:
108 		if (handle) {
109 			return NULL;
110 		}
111 		break;
112 	case DISPATCH_EVFILT_TIMER:
113 		if (!!handle ^ !!type->ke.ident) {
114 			return NULL;
115 		}
116 		break;
117 	default:
118 		break;
119 	}
120 
121 	ds = _dispatch_alloc(DISPATCH_VTABLE(source),
122 			sizeof(struct dispatch_source_s));
123 	// Initialize as a queue first, then override some settings below.
124 	_dispatch_queue_init((dispatch_queue_t)ds);
125 	ds->dq_label = "source";
126 
127 	ds->do_ref_cnt++; // the reference the manager queue holds
128 	ds->do_ref_cnt++; // since source is created suspended
129 	ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
130 	// The initial target queue is the manager queue, in order to get
131 	// the source installed. <rdar://problem/8928171>
132 	ds->do_targetq = &_dispatch_mgr_q;
133 
134 	dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
135 	dk->dk_kevent = *proto_kev;
136 	dk->dk_kevent.ident = handle;
137 	dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
138 	dk->dk_kevent.fflags |= (uint32_t)mask;
139 	dk->dk_kevent.udata = (uintptr_t)dk;
140 	TAILQ_INIT(&dk->dk_sources);
141 
142 	ds->ds_dkev = dk;
143 	ds->ds_pending_data_mask = dk->dk_kevent.fflags;
144 	ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident;
145 	if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
146 		ds->ds_is_level = true;
147 		ds->ds_needs_rearm = true;
148 	} else if (!(EV_CLEAR & proto_kev->flags)) {
149 		// we cheat and use EV_CLEAR to mean a "flag thingy"
150 		ds->ds_is_adder = true;
151 	}
152 	// Some sources require special processing
153 	if (type->init != NULL) {
154 		type->init(ds, type, handle, mask, q);
155 	}
156 	dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
157 
158 	if (fastpath(!ds->ds_refs)) {
159 		ds->ds_refs = _dispatch_calloc(1ul,
160 				sizeof(struct dispatch_source_refs_s));
161 	}
162 	ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
163 
164 	// First item on the queue sets the user-specified target queue
165 	dispatch_set_target_queue(ds, q);
166 	_dispatch_object_debug(ds, "%s", __func__);
167 	return ds;
168 }
169 
170 void
_dispatch_source_dispose(dispatch_source_t ds)171 _dispatch_source_dispose(dispatch_source_t ds)
172 {
173 	_dispatch_object_debug(ds, "%s", __func__);
174 	free(ds->ds_refs);
175 	_dispatch_queue_destroy(ds);
176 }
177 
178 void
_dispatch_source_xref_dispose(dispatch_source_t ds)179 _dispatch_source_xref_dispose(dispatch_source_t ds)
180 {
181 	_dispatch_wakeup(ds);
182 }
183 
184 void
dispatch_source_cancel(dispatch_source_t ds)185 dispatch_source_cancel(dispatch_source_t ds)
186 {
187 	_dispatch_object_debug(ds, "%s", __func__);
188 	// Right after we set the cancel flag, someone else
189 	// could potentially invoke the source, do the cancelation,
190 	// unregister the source, and deallocate it. We would
191 	// need to therefore retain/release before setting the bit
192 
193 	_dispatch_retain(ds);
194 	(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED, relaxed);
195 	_dispatch_wakeup(ds);
196 	_dispatch_release(ds);
197 }
198 
199 long
dispatch_source_testcancel(dispatch_source_t ds)200 dispatch_source_testcancel(dispatch_source_t ds)
201 {
202 	return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
203 }
204 
205 unsigned long
dispatch_source_get_mask(dispatch_source_t ds)206 dispatch_source_get_mask(dispatch_source_t ds)
207 {
208 	unsigned long mask = ds->ds_pending_data_mask;
209 	if (ds->ds_vmpressure_override) {
210 		mask = NOTE_VM_PRESSURE;
211 	}
212 #if TARGET_IPHONE_SIMULATOR
213 	else if (ds->ds_memorystatus_override) {
214 		mask = NOTE_MEMORYSTATUS_PRESSURE_WARN;
215 	}
216 #endif
217 	return mask;
218 }
219 
220 uintptr_t
dispatch_source_get_handle(dispatch_source_t ds)221 dispatch_source_get_handle(dispatch_source_t ds)
222 {
223 	unsigned int handle = (unsigned int)ds->ds_ident_hack;
224 #if TARGET_IPHONE_SIMULATOR
225 	if (ds->ds_memorystatus_override) {
226 		handle = 0;
227 	}
228 #endif
229 	return handle;
230 }
231 
232 unsigned long
dispatch_source_get_data(dispatch_source_t ds)233 dispatch_source_get_data(dispatch_source_t ds)
234 {
235 	unsigned long data = ds->ds_data;
236 	if (ds->ds_vmpressure_override) {
237 		data = NOTE_VM_PRESSURE;
238 	}
239 #if TARGET_IPHONE_SIMULATOR
240 	else if (ds->ds_memorystatus_override) {
241 		data = NOTE_MEMORYSTATUS_PRESSURE_WARN;
242 	}
243 #endif
244 	return data;
245 }
246 
247 void
dispatch_source_merge_data(dispatch_source_t ds,unsigned long val)248 dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
249 {
250 	struct kevent64_s kev = {
251 		.fflags = (typeof(kev.fflags))val,
252 		.data = (typeof(kev.data))val,
253 	};
254 
255 	dispatch_assert(
256 			ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
257 			ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
258 
259 	_dispatch_source_merge_kevent(ds, &kev);
260 }
261 
262 #pragma mark -
263 #pragma mark dispatch_source_handler
264 
265 DISPATCH_ALWAYS_INLINE
266 static inline dispatch_continuation_t
_dispatch_source_handler_alloc(dispatch_source_t ds,void * handler,long kind,bool block)267 _dispatch_source_handler_alloc(dispatch_source_t ds, void *handler, long kind,
268 		bool block)
269 {
270 	dispatch_continuation_t dc = _dispatch_continuation_alloc();
271 	if (handler) {
272 		dc->do_vtable = (void *)((block ? DISPATCH_OBJ_BLOCK_RELEASE_BIT :
273 				DISPATCH_OBJ_CTXT_FETCH_BIT) | (kind != DS_EVENT_HANDLER ?
274 				DISPATCH_OBJ_ASYNC_BIT : 0l));
275 		dc->dc_priority = 0;
276 		dc->dc_voucher = NULL;
277 		if (block) {
278 #ifdef __BLOCKS__
279 			if (slowpath(_dispatch_block_has_private_data(handler))) {
280 				// sources don't propagate priority by default
281 				dispatch_block_flags_t flags = DISPATCH_BLOCK_NO_QOS_CLASS;
282 				flags |= _dispatch_block_get_flags(handler);
283 				_dispatch_continuation_priority_set(dc,
284 						_dispatch_block_get_priority(handler), flags);
285 			}
286 			if (kind != DS_EVENT_HANDLER) {
287 				dc->dc_func = _dispatch_call_block_and_release;
288 			} else {
289 				dc->dc_func = _dispatch_Block_invoke(handler);
290 			}
291 			dc->dc_ctxt = _dispatch_Block_copy(handler);
292 #endif /* __BLOCKS__ */
293 		} else {
294 			dc->dc_func = handler;
295 			dc->dc_ctxt = ds->do_ctxt;
296 		}
297 		_dispatch_trace_continuation_push((dispatch_queue_t)ds, dc);
298 	} else {
299 		dc->dc_func = NULL;
300 	}
301 	dc->dc_data = (void*)kind;
302 	return dc;
303 }
304 
305 static inline void
_dispatch_source_handler_replace(dispatch_source_refs_t dr,long kind,dispatch_continuation_t dc_new)306 _dispatch_source_handler_replace(dispatch_source_refs_t dr, long kind,
307 		dispatch_continuation_t dc_new)
308 {
309 	dispatch_continuation_t dc = dr->ds_handler[kind];
310 	if (dc) {
311 #ifdef __BLOCKS__
312 		if ((long)dc->do_vtable & DISPATCH_OBJ_BLOCK_RELEASE_BIT) {
313 			Block_release(dc->dc_ctxt);
314 		}
315 #endif /* __BLOCKS__ */
316 		if (dc->dc_voucher) {
317 			_voucher_release(dc->dc_voucher);
318 			dc->dc_voucher = NULL;
319 		}
320 		_dispatch_continuation_free(dc);
321 	}
322 	dr->ds_handler[kind] = dc_new;
323 }
324 
325 static inline void
_dispatch_source_handler_free(dispatch_source_refs_t dr,long kind)326 _dispatch_source_handler_free(dispatch_source_refs_t dr, long kind)
327 {
328 	_dispatch_source_handler_replace(dr, kind, NULL);
329 }
330 
331 static void
_dispatch_source_set_handler(void * context)332 _dispatch_source_set_handler(void *context)
333 {
334 	dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
335 	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
336 	dispatch_continuation_t dc = context;
337 	long kind = (long)dc->dc_data;
338 	dc->dc_data = 0;
339 	if (!dc->dc_func) {
340 		_dispatch_continuation_free(dc);
341 		dc = NULL;
342 	} else if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
343 		dc->dc_ctxt = ds->do_ctxt;
344 	}
345 	_dispatch_source_handler_replace(ds->ds_refs, kind, dc);
346 	if (kind == DS_EVENT_HANDLER && dc && dc->dc_priority) {
347 #if HAVE_PTHREAD_WORKQUEUE_QOS
348 		ds->dq_priority = dc->dc_priority & ~_PTHREAD_PRIORITY_FLAGS_MASK;
349 		_dispatch_queue_set_override_priority((dispatch_queue_t)ds);
350 #endif
351 	}
352 }
353 
354 #ifdef __BLOCKS__
355 void
dispatch_source_set_event_handler(dispatch_source_t ds,dispatch_block_t handler)356 dispatch_source_set_event_handler(dispatch_source_t ds,
357 		dispatch_block_t handler)
358 {
359 	dispatch_continuation_t dc;
360 	dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, true);
361 	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
362 			_dispatch_source_set_handler);
363 }
364 #endif /* __BLOCKS__ */
365 
366 void
dispatch_source_set_event_handler_f(dispatch_source_t ds,dispatch_function_t handler)367 dispatch_source_set_event_handler_f(dispatch_source_t ds,
368 		dispatch_function_t handler)
369 {
370 	dispatch_continuation_t dc;
371 	dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
372 	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
373 			_dispatch_source_set_handler);
374 }
375 
376 void
_dispatch_source_set_event_handler_with_context_f(dispatch_source_t ds,void * ctxt,dispatch_function_t handler)377 _dispatch_source_set_event_handler_with_context_f(dispatch_source_t ds,
378 		void *ctxt, dispatch_function_t handler)
379 {
380 	dispatch_continuation_t dc;
381 	dc = _dispatch_source_handler_alloc(ds, handler, DS_EVENT_HANDLER, false);
382 	dc->do_vtable = (void *)((long)dc->do_vtable &~DISPATCH_OBJ_CTXT_FETCH_BIT);
383 	dc->dc_other = dc->dc_ctxt;
384 	dc->dc_ctxt = ctxt;
385 	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
386 			_dispatch_source_set_handler);
387 }
388 
389 #ifdef __BLOCKS__
390 void
dispatch_source_set_cancel_handler(dispatch_source_t ds,dispatch_block_t handler)391 dispatch_source_set_cancel_handler(dispatch_source_t ds,
392 		dispatch_block_t handler)
393 {
394 	dispatch_continuation_t dc;
395 	dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, true);
396 	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
397 			_dispatch_source_set_handler);
398 }
399 #endif /* __BLOCKS__ */
400 
401 void
dispatch_source_set_cancel_handler_f(dispatch_source_t ds,dispatch_function_t handler)402 dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
403 		dispatch_function_t handler)
404 {
405 	dispatch_continuation_t dc;
406 	dc = _dispatch_source_handler_alloc(ds, handler, DS_CANCEL_HANDLER, false);
407 	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
408 			_dispatch_source_set_handler);
409 }
410 
411 #ifdef __BLOCKS__
412 void
dispatch_source_set_registration_handler(dispatch_source_t ds,dispatch_block_t handler)413 dispatch_source_set_registration_handler(dispatch_source_t ds,
414 		dispatch_block_t handler)
415 {
416 	dispatch_continuation_t dc;
417 	dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, true);
418 	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
419 			_dispatch_source_set_handler);
420 }
421 #endif /* __BLOCKS__ */
422 
423 void
dispatch_source_set_registration_handler_f(dispatch_source_t ds,dispatch_function_t handler)424 dispatch_source_set_registration_handler_f(dispatch_source_t ds,
425 	dispatch_function_t handler)
426 {
427 	dispatch_continuation_t dc;
428 	dc = _dispatch_source_handler_alloc(ds, handler, DS_REGISTN_HANDLER, false);
429 	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, dc,
430 			_dispatch_source_set_handler);
431 }
432 
433 #pragma mark -
434 #pragma mark dispatch_source_invoke
435 
436 static void
_dispatch_source_registration_callout(dispatch_source_t ds)437 _dispatch_source_registration_callout(dispatch_source_t ds)
438 {
439 	dispatch_source_refs_t dr = ds->ds_refs;
440 	dispatch_continuation_t dc = dr->ds_handler[DS_REGISTN_HANDLER];
441 	if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
442 		// no registration callout if source is canceled rdar://problem/8955246
443 		return _dispatch_source_handler_free(dr, DS_REGISTN_HANDLER);
444 	}
445 	pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
446 	if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
447 		dc->dc_ctxt = ds->do_ctxt;
448 	}
449 	_dispatch_continuation_pop(dc);
450 	dr->ds_handler[DS_REGISTN_HANDLER] = NULL;
451 	_dispatch_reset_defaultpriority(old_dp);
452 }
453 
454 static void
_dispatch_source_cancel_callout(dispatch_source_t ds)455 _dispatch_source_cancel_callout(dispatch_source_t ds)
456 {
457 	dispatch_source_refs_t dr = ds->ds_refs;
458 	dispatch_continuation_t dc = dr->ds_handler[DS_CANCEL_HANDLER];
459 	ds->ds_pending_data_mask = 0;
460 	ds->ds_pending_data = 0;
461 	ds->ds_data = 0;
462 	_dispatch_source_handler_free(dr, DS_EVENT_HANDLER);
463 	_dispatch_source_handler_free(dr, DS_REGISTN_HANDLER);
464 	if (!dc) {
465 		return;
466 	}
467 	if (!(ds->ds_atomic_flags & DSF_CANCELED)) {
468 		return _dispatch_source_handler_free(dr, DS_CANCEL_HANDLER);
469 	}
470 	pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
471 	if ((long)dc->do_vtable & DISPATCH_OBJ_CTXT_FETCH_BIT) {
472 		dc->dc_ctxt = ds->do_ctxt;
473 	}
474 	_dispatch_continuation_pop(dc);
475 	dr->ds_handler[DS_CANCEL_HANDLER] = NULL;
476 	_dispatch_reset_defaultpriority(old_dp);
477 }
478 
479 static void
_dispatch_source_latch_and_call(dispatch_source_t ds)480 _dispatch_source_latch_and_call(dispatch_source_t ds)
481 {
482 	unsigned long prev;
483 
484 	if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
485 		return;
486 	}
487 	dispatch_source_refs_t dr = ds->ds_refs;
488 	dispatch_continuation_t dc = dr->ds_handler[DS_EVENT_HANDLER];
489 	prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
490 	if (ds->ds_is_level) {
491 		ds->ds_data = ~prev;
492 	} else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
493 		ds->ds_data = _dispatch_source_timer_data(dr, prev);
494 	} else {
495 		ds->ds_data = prev;
496 	}
497 	if (!dispatch_assume(prev) || !dc) {
498 		return;
499 	}
500 	pthread_priority_t old_dp = _dispatch_set_defaultpriority(ds->dq_priority);
501 	_dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dc);
502 	voucher_t voucher = dc->dc_voucher ? _voucher_retain(dc->dc_voucher) : NULL;
503 	_dispatch_continuation_voucher_adopt(dc); // consumes voucher reference
504 	_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
505 	_dispatch_introspection_queue_item_complete(dc);
506 	if (voucher) dc->dc_voucher = voucher;
507 	_dispatch_reset_defaultpriority(old_dp);
508 }
509 
510 static void
_dispatch_source_kevent_unregister(dispatch_source_t ds)511 _dispatch_source_kevent_unregister(dispatch_source_t ds)
512 {
513 	_dispatch_object_debug(ds, "%s", __func__);
514 	dispatch_kevent_t dk = ds->ds_dkev;
515 	ds->ds_dkev = NULL;
516 	switch (dk->dk_kevent.filter) {
517 	case DISPATCH_EVFILT_TIMER:
518 		_dispatch_timers_unregister(ds, dk);
519 		break;
520 	default:
521 		TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
522 		_dispatch_kevent_unregister(dk, (uint32_t)ds->ds_pending_data_mask);
523 		break;
524 	}
525 
526 	(void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
527 	ds->ds_needs_rearm = false; // re-arm is pointless and bad now
528 	_dispatch_release(ds); // the retain is done at creation time
529 }
530 
531 static void
_dispatch_source_kevent_resume(dispatch_source_t ds,uint32_t new_flags)532 _dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
533 {
534 	switch (ds->ds_dkev->dk_kevent.filter) {
535 	case DISPATCH_EVFILT_TIMER:
536 		return _dispatch_timers_update(ds);
537 	case EVFILT_MACHPORT:
538 		if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) {
539 			new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
540 		}
541 		break;
542 	}
543 	if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) {
544 		_dispatch_source_kevent_unregister(ds);
545 	}
546 }
547 
548 static void
_dispatch_source_kevent_register(dispatch_source_t ds)549 _dispatch_source_kevent_register(dispatch_source_t ds)
550 {
551 	dispatch_assert_zero(ds->ds_is_installed);
552 	switch (ds->ds_dkev->dk_kevent.filter) {
553 	case DISPATCH_EVFILT_TIMER:
554 		return _dispatch_timers_update(ds);
555 	}
556 	uint32_t flags;
557 	bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, &flags);
558 	TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list);
559 	if (do_resume || ds->ds_needs_rearm) {
560 		_dispatch_source_kevent_resume(ds, flags);
561 	}
562 	(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
563 	_dispatch_object_debug(ds, "%s", __func__);
564 }
565 
566 DISPATCH_ALWAYS_INLINE
567 static inline dispatch_queue_t
_dispatch_source_invoke2(dispatch_object_t dou,_dispatch_thread_semaphore_t * sema_ptr DISPATCH_UNUSED)568 _dispatch_source_invoke2(dispatch_object_t dou,
569 		_dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
570 {
571 	dispatch_source_t ds = dou._ds;
572 	if (slowpath(_dispatch_queue_drain(ds))) {
573 		DISPATCH_CLIENT_CRASH("Sync onto source");
574 	}
575 
576 	// This function performs all source actions. Each action is responsible
577 	// for verifying that it takes place on the appropriate queue. If the
578 	// current queue is not the correct queue for this action, the correct queue
579 	// will be returned and the invoke will be re-driven on that queue.
580 
581 	// The order of tests here in invoke and in probe should be consistent.
582 
583 	dispatch_queue_t dq = _dispatch_queue_get_current();
584 	dispatch_source_refs_t dr = ds->ds_refs;
585 
586 	if (!ds->ds_is_installed) {
587 		// The source needs to be installed on the manager queue.
588 		if (dq != &_dispatch_mgr_q) {
589 			return &_dispatch_mgr_q;
590 		}
591 		_dispatch_source_kevent_register(ds);
592 		ds->ds_is_installed = true;
593 		if (dr->ds_handler[DS_REGISTN_HANDLER]) {
594 			return ds->do_targetq;
595 		}
596 		if (slowpath(ds->do_xref_cnt == -1)) {
597 			return &_dispatch_mgr_q; // rdar://problem/9558246
598 		}
599 	} else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
600 		// Source suspended by an item drained from the source queue.
601 		return NULL;
602 	} else if (dr->ds_handler[DS_REGISTN_HANDLER]) {
603 		// The source has been registered and the registration handler needs
604 		// to be delivered on the target queue.
605 		if (dq != ds->do_targetq) {
606 			return ds->do_targetq;
607 		}
608 		// clears ds_registration_handler
609 		_dispatch_source_registration_callout(ds);
610 		if (slowpath(ds->do_xref_cnt == -1)) {
611 			return &_dispatch_mgr_q; // rdar://problem/9558246
612 		}
613 	} else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
614 		// The source has been cancelled and needs to be uninstalled from the
615 		// manager queue. After uninstallation, the cancellation handler needs
616 		// to be delivered to the target queue.
617 		if (ds->ds_dkev) {
618 			if (dq != &_dispatch_mgr_q) {
619 				return &_dispatch_mgr_q;
620 			}
621 			_dispatch_source_kevent_unregister(ds);
622 		}
623 		if (dr->ds_handler[DS_EVENT_HANDLER] ||
624 				dr->ds_handler[DS_CANCEL_HANDLER] ||
625 				dr->ds_handler[DS_REGISTN_HANDLER]) {
626 			if (dq != ds->do_targetq) {
627 				return ds->do_targetq;
628 			}
629 		}
630 		_dispatch_source_cancel_callout(ds);
631 	} else if (ds->ds_pending_data) {
632 		// The source has pending data to deliver via the event handler callback
633 		// on the target queue. Some sources need to be rearmed on the manager
634 		// queue after event delivery.
635 		if (dq != ds->do_targetq) {
636 			return ds->do_targetq;
637 		}
638 		_dispatch_source_latch_and_call(ds);
639 		if (ds->ds_needs_rearm) {
640 			return &_dispatch_mgr_q;
641 		}
642 	} else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
643 		// The source needs to be rearmed on the manager queue.
644 		if (dq != &_dispatch_mgr_q) {
645 			return &_dispatch_mgr_q;
646 		}
647 		_dispatch_source_kevent_resume(ds, 0);
648 		(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
649 	}
650 
651 	return NULL;
652 }
653 
654 DISPATCH_NOINLINE
655 void
_dispatch_source_invoke(dispatch_source_t ds)656 _dispatch_source_invoke(dispatch_source_t ds)
657 {
658 	_dispatch_queue_class_invoke(ds, _dispatch_source_invoke2);
659 }
660 
661 unsigned long
_dispatch_source_probe(dispatch_source_t ds)662 _dispatch_source_probe(dispatch_source_t ds)
663 {
664 	// This function determines whether the source needs to be invoked.
665 	// The order of tests here in probe and in invoke should be consistent.
666 
667 	dispatch_source_refs_t dr = ds->ds_refs;
668 	if (!ds->ds_is_installed) {
669 		// The source needs to be installed on the manager queue.
670 		return true;
671 	} else if (dr->ds_handler[DS_REGISTN_HANDLER]) {
672 		// The registration handler needs to be delivered to the target queue.
673 		return true;
674 	} else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
675 		// The source needs to be uninstalled from the manager queue, or the
676 		// cancellation handler needs to be delivered to the target queue.
677 		// Note: cancellation assumes installation.
678 		if (ds->ds_dkev || dr->ds_handler[DS_EVENT_HANDLER] ||
679 				dr->ds_handler[DS_CANCEL_HANDLER] ||
680 				dr->ds_handler[DS_REGISTN_HANDLER]) {
681 			return true;
682 		}
683 	} else if (ds->ds_pending_data) {
684 		// The source has pending data to deliver to the target queue.
685 		return true;
686 	} else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
687 		// The source needs to be rearmed on the manager queue.
688 		return true;
689 	}
690 	return _dispatch_queue_class_probe(ds);
691 }
692 
693 static void
_dispatch_source_merge_kevent(dispatch_source_t ds,const struct kevent64_s * ke)694 _dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent64_s *ke)
695 {
696 	if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
697 		return;
698 	}
699 	if (ds->ds_is_level) {
700 		// ke->data is signed and "negative available data" makes no sense
701 		// zero bytes happens when EV_EOF is set
702 		// 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
703 		dispatch_assert(ke->data >= 0l);
704 		dispatch_atomic_store2o(ds, ds_pending_data, ~(unsigned long)ke->data,
705 				relaxed);
706 	} else if (ds->ds_is_adder) {
707 		(void)dispatch_atomic_add2o(ds, ds_pending_data,
708 				(unsigned long)ke->data, relaxed);
709 	} else if (ke->fflags & ds->ds_pending_data_mask) {
710 		(void)dispatch_atomic_or2o(ds, ds_pending_data,
711 				ke->fflags & ds->ds_pending_data_mask, relaxed);
712 	}
713 	// EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
714 	if (ds->ds_needs_rearm) {
715 		(void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
716 	}
717 
718 	_dispatch_wakeup(ds);
719 }
720 
721 #pragma mark -
722 #pragma mark dispatch_kevent_t
723 
724 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
725 static void _dispatch_kevent_guard(dispatch_kevent_t dk);
726 static void _dispatch_kevent_unguard(dispatch_kevent_t dk);
727 #else
_dispatch_kevent_guard(dispatch_kevent_t dk)728 static inline void _dispatch_kevent_guard(dispatch_kevent_t dk) { (void)dk; }
_dispatch_kevent_unguard(dispatch_kevent_t dk)729 static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk) { (void)dk; }
730 #endif
731 
732 static struct dispatch_kevent_s _dispatch_kevent_data_or = {
733 	.dk_kevent = {
734 		.filter = DISPATCH_EVFILT_CUSTOM_OR,
735 		.flags = EV_CLEAR,
736 	},
737 	.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
738 };
739 static struct dispatch_kevent_s _dispatch_kevent_data_add = {
740 	.dk_kevent = {
741 		.filter = DISPATCH_EVFILT_CUSTOM_ADD,
742 	},
743 	.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
744 };
745 
746 #define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
747 
748 DISPATCH_CACHELINE_ALIGN
749 static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
750 
751 static void
_dispatch_kevent_init()752 _dispatch_kevent_init()
753 {
754 	unsigned int i;
755 	for (i = 0; i < DSL_HASH_SIZE; i++) {
756 		TAILQ_INIT(&_dispatch_sources[i]);
757 	}
758 
759 	TAILQ_INSERT_TAIL(&_dispatch_sources[0],
760 			&_dispatch_kevent_data_or, dk_list);
761 	TAILQ_INSERT_TAIL(&_dispatch_sources[0],
762 			&_dispatch_kevent_data_add, dk_list);
763 	_dispatch_kevent_data_or.dk_kevent.udata =
764 			(uintptr_t)&_dispatch_kevent_data_or;
765 	_dispatch_kevent_data_add.dk_kevent.udata =
766 			(uintptr_t)&_dispatch_kevent_data_add;
767 }
768 
769 static inline uintptr_t
_dispatch_kevent_hash(uint64_t ident,short filter)770 _dispatch_kevent_hash(uint64_t ident, short filter)
771 {
772 	uint64_t value;
773 #if HAVE_MACH
774 	value = (filter == EVFILT_MACHPORT ||
775 			filter == DISPATCH_EVFILT_MACH_NOTIFICATION ?
776 			MACH_PORT_INDEX(ident) : ident);
777 #else
778 	value = ident;
779 #endif
780 	return DSL_HASH((uintptr_t)value);
781 }
782 
783 static dispatch_kevent_t
_dispatch_kevent_find(uint64_t ident,short filter)784 _dispatch_kevent_find(uint64_t ident, short filter)
785 {
786 	uintptr_t hash = _dispatch_kevent_hash(ident, filter);
787 	dispatch_kevent_t dki;
788 
789 	TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
790 		if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
791 			break;
792 		}
793 	}
794 	return dki;
795 }
796 
797 static void
_dispatch_kevent_insert(dispatch_kevent_t dk)798 _dispatch_kevent_insert(dispatch_kevent_t dk)
799 {
800 	_dispatch_kevent_guard(dk);
801 	uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
802 			dk->dk_kevent.filter);
803 	TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
804 }
805 
806 // Find existing kevents, and merge any new flags if necessary
807 static bool
_dispatch_kevent_register(dispatch_kevent_t * dkp,uint32_t * flgp)808 _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp)
809 {
810 	dispatch_kevent_t dk, ds_dkev = *dkp;
811 	uint32_t new_flags;
812 	bool do_resume = false;
813 
814 	dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident,
815 			ds_dkev->dk_kevent.filter);
816 	if (dk) {
817 		// If an existing dispatch kevent is found, check to see if new flags
818 		// need to be added to the existing kevent
819 		new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags;
820 		dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags;
821 		free(ds_dkev);
822 		*dkp = dk;
823 		do_resume = new_flags;
824 	} else {
825 		dk = ds_dkev;
826 		_dispatch_kevent_insert(dk);
827 		new_flags = dk->dk_kevent.fflags;
828 		do_resume = true;
829 	}
830 	// Re-register the kevent with the kernel if new flags were added
831 	// by the dispatch kevent
832 	if (do_resume) {
833 		dk->dk_kevent.flags |= EV_ADD;
834 	}
835 	*flgp = new_flags;
836 	return do_resume;
837 }
838 
839 static bool
_dispatch_kevent_resume(dispatch_kevent_t dk,uint32_t new_flags,uint32_t del_flags)840 _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
841 		uint32_t del_flags)
842 {
843 	long r;
844 	switch (dk->dk_kevent.filter) {
845 	case DISPATCH_EVFILT_TIMER:
846 	case DISPATCH_EVFILT_CUSTOM_ADD:
847 	case DISPATCH_EVFILT_CUSTOM_OR:
848 		// these types not registered with kevent
849 		return 0;
850 #if HAVE_MACH
851 	case EVFILT_MACHPORT:
852 		return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
853 	case DISPATCH_EVFILT_MACH_NOTIFICATION:
854 		return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags);
855 #endif
856 	case EVFILT_PROC:
857 		if (dk->dk_kevent.flags & EV_ONESHOT) {
858 			return 0;
859 		}
860 		// fall through
861 	default:
862 		r = _dispatch_kq_update(&dk->dk_kevent);
863 		if (dk->dk_kevent.flags & EV_DISPATCH) {
864 			dk->dk_kevent.flags &= ~EV_ADD;
865 		}
866 		return r;
867 	}
868 }
869 
870 static void
_dispatch_kevent_dispose(dispatch_kevent_t dk)871 _dispatch_kevent_dispose(dispatch_kevent_t dk)
872 {
873 	uintptr_t hash;
874 
875 	switch (dk->dk_kevent.filter) {
876 	case DISPATCH_EVFILT_TIMER:
877 	case DISPATCH_EVFILT_CUSTOM_ADD:
878 	case DISPATCH_EVFILT_CUSTOM_OR:
879 		// these sources live on statically allocated lists
880 		return;
881 #if HAVE_MACH
882 	case EVFILT_MACHPORT:
883 		_dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
884 		break;
885 	case DISPATCH_EVFILT_MACH_NOTIFICATION:
886 		_dispatch_kevent_mach_notify_resume(dk, 0, dk->dk_kevent.fflags);
887 		break;
888 #endif
889 	case EVFILT_PROC:
890 		if (dk->dk_kevent.flags & EV_ONESHOT) {
891 			break; // implicitly deleted
892 		}
893 		// fall through
894 	default:
895 		if (~dk->dk_kevent.flags & EV_DELETE) {
896 			dk->dk_kevent.flags |= EV_DELETE;
897 			dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE);
898 			_dispatch_kq_update(&dk->dk_kevent);
899 		}
900 		break;
901 	}
902 
903 	hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
904 			dk->dk_kevent.filter);
905 	TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
906 	_dispatch_kevent_unguard(dk);
907 	free(dk);
908 }
909 
910 static void
_dispatch_kevent_unregister(dispatch_kevent_t dk,uint32_t flg)911 _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg)
912 {
913 	dispatch_source_refs_t dri;
914 	uint32_t del_flags, fflags = 0;
915 
916 	if (TAILQ_EMPTY(&dk->dk_sources)) {
917 		_dispatch_kevent_dispose(dk);
918 	} else {
919 		TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
920 			dispatch_source_t dsi = _dispatch_source_from_refs(dri);
921 			uint32_t mask = (uint32_t)dsi->ds_pending_data_mask;
922 			fflags |= mask;
923 		}
924 		del_flags = flg & ~fflags;
925 		if (del_flags) {
926 			dk->dk_kevent.flags |= EV_ADD;
927 			dk->dk_kevent.fflags = fflags;
928 			_dispatch_kevent_resume(dk, 0, del_flags);
929 		}
930 	}
931 }
932 
933 DISPATCH_NOINLINE
934 static void
_dispatch_kevent_proc_exit(struct kevent64_s * ke)935 _dispatch_kevent_proc_exit(struct kevent64_s *ke)
936 {
937 	// EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
938 	// <rdar://problem/5067725>. As a workaround, we simulate an exit event for
939 	// any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
940 	struct kevent64_s fake;
941 	fake = *ke;
942 	fake.flags &= ~EV_ERROR;
943 	fake.fflags = NOTE_EXIT;
944 	fake.data = 0;
945 	_dispatch_kevent_drain(&fake);
946 }
947 
948 DISPATCH_NOINLINE
949 static void
_dispatch_kevent_error(struct kevent64_s * ke)950 _dispatch_kevent_error(struct kevent64_s *ke)
951 {
952 	_dispatch_kevent_debug(ke, __func__);
953 	if (ke->data) {
954 		// log the unexpected error
955 		_dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter),
956 				ke->flags & EV_DELETE ? "delete" :
957 				ke->flags & EV_ADD ? "add" :
958 				ke->flags & EV_ENABLE ? "enable" : "monitor",
959 				(int)ke->data);
960 	}
961 }
962 
963 static void
_dispatch_kevent_drain(struct kevent64_s * ke)964 _dispatch_kevent_drain(struct kevent64_s *ke)
965 {
966 #if DISPATCH_DEBUG
967 	static dispatch_once_t pred;
968 	dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
969 #endif
970 	if (ke->filter == EVFILT_USER) {
971 		return;
972 	}
973 	if (slowpath(ke->flags & EV_ERROR)) {
974 		if (ke->filter == EVFILT_PROC) {
975 			if (ke->flags & EV_DELETE) {
976 				// Process exited while monitored
977 				return;
978 			} else if (ke->data == ESRCH) {
979 				return _dispatch_kevent_proc_exit(ke);
980 			}
981 		}
982 		return _dispatch_kevent_error(ke);
983 	}
984 	_dispatch_kevent_debug(ke, __func__);
985 	if (ke->filter == EVFILT_TIMER) {
986 		return _dispatch_timers_kevent(ke);
987 	}
988 #if HAVE_MACH
989 	if (ke->filter == EVFILT_MACHPORT) {
990 		return _dispatch_kevent_mach_portset(ke);
991 	}
992 #endif
993 	return _dispatch_kevent_merge(ke);
994 }
995 
996 DISPATCH_NOINLINE
997 static void
_dispatch_kevent_merge(struct kevent64_s * ke)998 _dispatch_kevent_merge(struct kevent64_s *ke)
999 {
1000 	dispatch_kevent_t dk;
1001 	dispatch_source_refs_t dri;
1002 
1003 	dk = (void*)ke->udata;
1004 	dispatch_assert(dk);
1005 
1006 	if (ke->flags & EV_ONESHOT) {
1007 		dk->dk_kevent.flags |= EV_ONESHOT;
1008 	}
1009 	TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1010 		_dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
1011 	}
1012 }
1013 
1014 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1015 static void
_dispatch_kevent_guard(dispatch_kevent_t dk)1016 _dispatch_kevent_guard(dispatch_kevent_t dk)
1017 {
1018 	guardid_t guard;
1019 	const unsigned int guard_flags = GUARD_CLOSE;
1020 	int r, fd_flags = 0;
1021 	switch (dk->dk_kevent.filter) {
1022 	case EVFILT_READ:
1023 	case EVFILT_WRITE:
1024 	case EVFILT_VNODE:
1025 		guard = &dk->dk_kevent;
1026 		r = change_fdguard_np((int)dk->dk_kevent.ident, NULL, 0,
1027 				&guard, guard_flags, &fd_flags);
1028 		if (slowpath(r == -1)) {
1029 			int err = errno;
1030 			if (err != EPERM) {
1031 				(void)dispatch_assume_zero(err);
1032 			}
1033 			return;
1034 		}
1035 		dk->dk_kevent.ext[0] = guard_flags;
1036 		dk->dk_kevent.ext[1] = fd_flags;
1037 		break;
1038 	}
1039 }
1040 
1041 static void
_dispatch_kevent_unguard(dispatch_kevent_t dk)1042 _dispatch_kevent_unguard(dispatch_kevent_t dk)
1043 {
1044 	guardid_t guard;
1045 	unsigned int guard_flags;
1046 	int r, fd_flags;
1047 	switch (dk->dk_kevent.filter) {
1048 	case EVFILT_READ:
1049 	case EVFILT_WRITE:
1050 	case EVFILT_VNODE:
1051 		guard_flags = (unsigned int)dk->dk_kevent.ext[0];
1052 		if (!guard_flags) {
1053 			return;
1054 		}
1055 		guard = &dk->dk_kevent;
1056 		fd_flags = (int)dk->dk_kevent.ext[1];
1057 		r = change_fdguard_np((int)dk->dk_kevent.ident, &guard,
1058 				guard_flags, NULL, 0, &fd_flags);
1059 		if (slowpath(r == -1)) {
1060 			(void)dispatch_assume_zero(errno);
1061 			return;
1062 		}
1063 		dk->dk_kevent.ext[0] = 0;
1064 		break;
1065 	}
1066 }
1067 #endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1068 
1069 #pragma mark -
1070 #pragma mark dispatch_source_timer
1071 
1072 #if DISPATCH_USE_DTRACE
1073 static dispatch_source_refs_t
1074 		_dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
1075 #define _dispatch_trace_next_timer_set(x, q) \
1076 		_dispatch_trace_next_timer[(q)] = (x)
1077 #define _dispatch_trace_next_timer_program(d, q) \
1078 		_dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1079 #define _dispatch_trace_next_timer_wake(q) \
1080 		_dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1081 #else
1082 #define _dispatch_trace_next_timer_set(x, q)
1083 #define _dispatch_trace_next_timer_program(d, q)
1084 #define _dispatch_trace_next_timer_wake(q)
1085 #endif
1086 
1087 #define _dispatch_source_timer_telemetry_enabled() false
1088 
1089 DISPATCH_NOINLINE
1090 static void
_dispatch_source_timer_telemetry_slow(dispatch_source_t ds,uintptr_t ident,struct dispatch_timer_source_s * values)1091 _dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
1092 		uintptr_t ident, struct dispatch_timer_source_s *values)
1093 {
1094 	if (_dispatch_trace_timer_configure_enabled()) {
1095 		_dispatch_trace_timer_configure(ds, ident, values);
1096 	}
1097 }
1098 
1099 DISPATCH_ALWAYS_INLINE
1100 static inline void
_dispatch_source_timer_telemetry(dispatch_source_t ds,uintptr_t ident,struct dispatch_timer_source_s * values)1101 _dispatch_source_timer_telemetry(dispatch_source_t ds, uintptr_t ident,
1102 		struct dispatch_timer_source_s *values)
1103 {
1104 	if (_dispatch_trace_timer_configure_enabled() ||
1105 			_dispatch_source_timer_telemetry_enabled()) {
1106 		_dispatch_source_timer_telemetry_slow(ds, ident, values);
1107 		asm(""); // prevent tailcall
1108 	}
1109 }
1110 
1111 // approx 1 year (60s * 60m * 24h * 365d)
1112 #define FOREVER_NSEC 31536000000000000ull
1113 
1114 DISPATCH_ALWAYS_INLINE
1115 static inline uint64_t
_dispatch_source_timer_now(uint64_t nows[],unsigned int tidx)1116 _dispatch_source_timer_now(uint64_t nows[], unsigned int tidx)
1117 {
1118 	unsigned int tk = DISPATCH_TIMER_KIND(tidx);
1119 	if (nows && fastpath(nows[tk])) {
1120 		return nows[tk];
1121 	}
1122 	uint64_t now;
1123 	switch (tk) {
1124 	case DISPATCH_TIMER_KIND_MACH:
1125 		now = _dispatch_absolute_time();
1126 		break;
1127 	case DISPATCH_TIMER_KIND_WALL:
1128 		now = _dispatch_get_nanoseconds();
1129 		break;
1130 	}
1131 	if (nows) {
1132 		nows[tk] = now;
1133 	}
1134 	return now;
1135 }
1136 
1137 static inline unsigned long
_dispatch_source_timer_data(dispatch_source_refs_t dr,unsigned long prev)1138 _dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1139 {
1140 	// calculate the number of intervals since last fire
1141 	unsigned long data, missed;
1142 	uint64_t now;
1143 	now = _dispatch_source_timer_now(NULL, _dispatch_source_timer_idx(dr));
1144 	missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1145 			ds_timer(dr).interval);
1146 	// correct for missed intervals already delivered last time
1147 	data = prev - ds_timer(dr).missed + missed;
1148 	ds_timer(dr).missed = missed;
1149 	return data;
1150 }
1151 
1152 struct dispatch_set_timer_params {
1153 	dispatch_source_t ds;
1154 	uintptr_t ident;
1155 	struct dispatch_timer_source_s values;
1156 };
1157 
1158 static void
_dispatch_source_set_timer3(void * context)1159 _dispatch_source_set_timer3(void *context)
1160 {
1161 	// Called on the _dispatch_mgr_q
1162 	struct dispatch_set_timer_params *params = context;
1163 	dispatch_source_t ds = params->ds;
1164 	ds->ds_ident_hack = params->ident;
1165 	ds_timer(ds->ds_refs) = params->values;
1166 	// Clear any pending data that might have accumulated on
1167 	// older timer params <rdar://problem/8574886>
1168 	ds->ds_pending_data = 0;
1169 	// Re-arm in case we got disarmed because of pending set_timer suspension
1170 	(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, release);
1171 	dispatch_resume(ds);
1172 	// Must happen after resume to avoid getting disarmed due to suspension
1173 	_dispatch_timers_update(ds);
1174 	dispatch_release(ds);
1175 	if (params->values.flags & DISPATCH_TIMER_WALL_CLOCK) {
1176 		_dispatch_mach_host_calendar_change_register();
1177 	}
1178 	free(params);
1179 }
1180 
1181 static void
_dispatch_source_set_timer2(void * context)1182 _dispatch_source_set_timer2(void *context)
1183 {
1184 	// Called on the source queue
1185 	struct dispatch_set_timer_params *params = context;
1186 	dispatch_suspend(params->ds);
1187 	_dispatch_barrier_async_detached_f(&_dispatch_mgr_q, params,
1188 			_dispatch_source_set_timer3);
1189 }
1190 
1191 DISPATCH_NOINLINE
1192 static struct dispatch_set_timer_params *
_dispatch_source_timer_params(dispatch_source_t ds,dispatch_time_t start,uint64_t interval,uint64_t leeway)1193 _dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start,
1194 		uint64_t interval, uint64_t leeway)
1195 {
1196 	struct dispatch_set_timer_params *params;
1197 	params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params));
1198 	params->ds = ds;
1199 	params->values.flags = ds_timer(ds->ds_refs).flags;
1200 
1201 	if (interval == 0) {
1202 		// we use zero internally to mean disabled
1203 		interval = 1;
1204 	} else if ((int64_t)interval < 0) {
1205 		// 6866347 - make sure nanoseconds won't overflow
1206 		interval = INT64_MAX;
1207 	}
1208 	if ((int64_t)leeway < 0) {
1209 		leeway = INT64_MAX;
1210 	}
1211 	if (start == DISPATCH_TIME_NOW) {
1212 		start = _dispatch_absolute_time();
1213 	} else if (start == DISPATCH_TIME_FOREVER) {
1214 		start = INT64_MAX;
1215 	}
1216 
1217 	if ((int64_t)start < 0) {
1218 		// wall clock
1219 		start = (dispatch_time_t)-((int64_t)start);
1220 		params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1221 	} else {
1222 		// absolute clock
1223 		interval = _dispatch_time_nano2mach(interval);
1224 		if (interval < 1) {
1225 			// rdar://problem/7287561 interval must be at least one in
1226 			// in order to avoid later division by zero when calculating
1227 			// the missed interval count. (NOTE: the wall clock's
1228 			// interval is already "fixed" to be 1 or more)
1229 			interval = 1;
1230 		}
1231 		leeway = _dispatch_time_nano2mach(leeway);
1232 		params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK;
1233 	}
1234 	params->ident = DISPATCH_TIMER_IDENT(params->values.flags);
1235 	params->values.target = start;
1236 	params->values.deadline = (start < UINT64_MAX - leeway) ?
1237 			start + leeway : UINT64_MAX;
1238 	params->values.interval = interval;
1239 	params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ?
1240 			leeway : interval / 2;
1241 	return params;
1242 }
1243 
1244 DISPATCH_ALWAYS_INLINE
1245 static inline void
_dispatch_source_set_timer(dispatch_source_t ds,dispatch_time_t start,uint64_t interval,uint64_t leeway,bool source_sync)1246 _dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1247 		uint64_t interval, uint64_t leeway, bool source_sync)
1248 {
1249 	if (slowpath(!ds->ds_is_timer) ||
1250 			slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) {
1251 		DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1252 	}
1253 
1254 	struct dispatch_set_timer_params *params;
1255 	params = _dispatch_source_timer_params(ds, start, interval, leeway);
1256 
1257 	_dispatch_source_timer_telemetry(ds, params->ident, &params->values);
1258 	// Suspend the source so that it doesn't fire with pending changes
1259 	// The use of suspend/resume requires the external retain/release
1260 	dispatch_retain(ds);
1261 	if (source_sync) {
1262 		return _dispatch_barrier_trysync_f((dispatch_queue_t)ds, params,
1263 				_dispatch_source_set_timer2);
1264 	} else {
1265 		return _dispatch_source_set_timer2(params);
1266 	}
1267 }
1268 
1269 void
dispatch_source_set_timer(dispatch_source_t ds,dispatch_time_t start,uint64_t interval,uint64_t leeway)1270 dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1271 		uint64_t interval, uint64_t leeway)
1272 {
1273 	_dispatch_source_set_timer(ds, start, interval, leeway, true);
1274 }
1275 
1276 void
_dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds,dispatch_time_t start,uint64_t interval,uint64_t leeway)1277 _dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds,
1278 		dispatch_time_t start, uint64_t interval, uint64_t leeway)
1279 {
1280 	// Don't serialize through the source queue for CF timers <rdar://13833190>
1281 	_dispatch_source_set_timer(ds, start, interval, leeway, false);
1282 }
1283 
1284 void
_dispatch_source_set_interval(dispatch_source_t ds,uint64_t interval)1285 _dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
1286 {
1287 	dispatch_source_refs_t dr = ds->ds_refs;
1288 	#define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1289 	const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION;
1290 	if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
1291 			FOREVER_NSEC/NSEC_PER_MSEC))) {
1292 		interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
1293 	} else {
1294 		interval = FOREVER_NSEC;
1295 	}
1296 	interval = _dispatch_time_nano2mach(interval);
1297 	uint64_t target = _dispatch_absolute_time() + interval;
1298 	target = (target / interval) * interval;
1299 	const uint64_t leeway = animation ?
1300 			_dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
1301 	ds_timer(dr).target = target;
1302 	ds_timer(dr).deadline = target + leeway;
1303 	ds_timer(dr).interval = interval;
1304 	ds_timer(dr).leeway = leeway;
1305 	_dispatch_source_timer_telemetry(ds, ds->ds_ident_hack, &ds_timer(dr));
1306 }
1307 
1308 #pragma mark -
1309 #pragma mark dispatch_timers
1310 
1311 #define DISPATCH_TIMER_STRUCT(refs) \
1312 	uint64_t target, deadline; \
1313 	TAILQ_HEAD(, refs) dt_sources
1314 
1315 typedef struct dispatch_timer_s {
1316 	DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s);
1317 } *dispatch_timer_t;
1318 
1319 #define DISPATCH_TIMER_INITIALIZER(tidx) \
1320 	[tidx] = { \
1321 		.target = UINT64_MAX, \
1322 		.deadline = UINT64_MAX, \
1323 		.dt_sources = TAILQ_HEAD_INITIALIZER( \
1324 				_dispatch_timer[tidx].dt_sources), \
1325 	}
1326 #define DISPATCH_TIMER_INIT(kind, qos) \
1327 		DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1328 		DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1329 
1330 static struct dispatch_timer_s _dispatch_timer[] =  {
1331 	DISPATCH_TIMER_INIT(WALL, NORMAL),
1332 	DISPATCH_TIMER_INIT(WALL, CRITICAL),
1333 	DISPATCH_TIMER_INIT(WALL, BACKGROUND),
1334 	DISPATCH_TIMER_INIT(MACH, NORMAL),
1335 	DISPATCH_TIMER_INIT(MACH, CRITICAL),
1336 	DISPATCH_TIMER_INIT(MACH, BACKGROUND),
1337 };
1338 #define DISPATCH_TIMER_COUNT \
1339 		((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1340 
1341 #define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1342 	(uintptr_t)&_dispatch_kevent_timer[tidx]
1343 #ifdef __LP64__
1344 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1345 		.udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1346 #else // __LP64__
1347 // dynamic initialization in _dispatch_timers_init()
1348 #define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1349 		.udata = 0
1350 #endif // __LP64__
1351 #define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1352 	[tidx] = { \
1353 		.dk_kevent = { \
1354 			.ident = tidx, \
1355 			.filter = DISPATCH_EVFILT_TIMER, \
1356 			DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1357 		}, \
1358 		.dk_sources = TAILQ_HEAD_INITIALIZER( \
1359 				_dispatch_kevent_timer[tidx].dk_sources), \
1360 	}
1361 #define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1362 		DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1363 		DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1364 
1365 static struct dispatch_kevent_s _dispatch_kevent_timer[] = {
1366 	DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL),
1367 	DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL),
1368 	DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND),
1369 	DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL),
1370 	DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL),
1371 	DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND),
1372 	DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM),
1373 };
1374 #define DISPATCH_KEVENT_TIMER_COUNT \
1375 		((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1376 
1377 #define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1378 #define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \
1379 	[qos] = { \
1380 		.ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \
1381 		.filter = EVFILT_TIMER, \
1382 		.flags = EV_ONESHOT, \
1383 		.fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1384 	}
1385 #define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \
1386 		DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note)
1387 
1388 static struct kevent64_s _dispatch_kevent_timeout[] = {
1389 	DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL, 0),
1390 	DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL, NOTE_CRITICAL),
1391 	DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND, NOTE_BACKGROUND),
1392 };
1393 
1394 #define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1395 		[DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1396 
1397 static const uint64_t _dispatch_kevent_coalescing_window[] = {
1398 	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
1399 	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
1400 	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
1401 };
1402 
1403 #define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1404 	typeof(dr) dri = NULL; typeof(dt) dti; \
1405 	if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1406 		TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1407 			if (ds_timer(dr).target < ds_timer(dri).target) { \
1408 				break; \
1409 			} \
1410 		} \
1411 		TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1412 			if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1413 				break; \
1414 			} \
1415 		} \
1416 		if (dti) { \
1417 			TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1418 		} else { \
1419 			TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1420 		} \
1421 	} \
1422 	if (dri) { \
1423 		TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1424 	} else { \
1425 		TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1426 	} \
1427 	})
1428 
1429 #define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1430 	({ \
1431 	if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1432 		TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1433 	} \
1434 	TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1435 			dr_list); })
1436 
1437 #define _dispatch_timers_check(dra, dta) ({ \
1438 	unsigned int qosm = _dispatch_timers_qos_mask; \
1439 	bool update = false; \
1440 	unsigned int tidx; \
1441 	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1442 		if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \
1443 			continue; \
1444 		} \
1445 		dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1446 				TAILQ_FIRST(&dra[tidx].dk_sources); \
1447 		dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1448 				TAILQ_FIRST(&dta[tidx].dt_sources); \
1449 		uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1450 		uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1451 		if (target != dta[tidx].target) { \
1452 			dta[tidx].target = target; \
1453 			update = true; \
1454 		} \
1455 		if (deadline != dta[tidx].deadline) { \
1456 			dta[tidx].deadline = deadline; \
1457 			update = true; \
1458 		} \
1459 	} \
1460 	update; })
1461 
1462 static bool _dispatch_timers_reconfigure, _dispatch_timer_expired;
1463 static unsigned int _dispatch_timers_qos_mask;
1464 static bool _dispatch_timers_force_max_leeway;
1465 
1466 static void
_dispatch_timers_init(void)1467 _dispatch_timers_init(void)
1468 {
1469 #ifndef __LP64__
1470 	unsigned int tidx;
1471 	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1472 		_dispatch_kevent_timer[tidx].dk_kevent.udata = \
1473 				DISPATCH_KEVENT_TIMER_UDATA(tidx);
1474 	}
1475 #endif // __LP64__
1476 	if (slowpath(getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY"))) {
1477 		_dispatch_timers_force_max_leeway = true;
1478 	}
1479 }
1480 
1481 static inline void
_dispatch_timers_unregister(dispatch_source_t ds,dispatch_kevent_t dk)1482 _dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk)
1483 {
1484 	dispatch_source_refs_t dr = ds->ds_refs;
1485 	unsigned int tidx = (unsigned int)dk->dk_kevent.ident;
1486 
1487 	if (slowpath(ds_timer_aggregate(ds))) {
1488 		_dispatch_timer_aggregates_unregister(ds, tidx);
1489 	}
1490 	_dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list,
1491 			_dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1492 	if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1493 		_dispatch_timers_reconfigure = true;
1494 		_dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1495 	}
1496 }
1497 
1498 // Updates the ordered list of timers based on next fire date for changes to ds.
1499 // Should only be called from the context of _dispatch_mgr_q.
1500 static void
_dispatch_timers_update(dispatch_source_t ds)1501 _dispatch_timers_update(dispatch_source_t ds)
1502 {
1503 	dispatch_kevent_t dk = ds->ds_dkev;
1504 	dispatch_source_refs_t dr = ds->ds_refs;
1505 	unsigned int tidx;
1506 
1507 	DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1508 
1509 	// Do not reschedule timers unregistered with _dispatch_kevent_unregister()
1510 	if (slowpath(!dk)) {
1511 		return;
1512 	}
1513 	// Move timers that are disabled, suspended or have missed intervals to the
1514 	// disarmed list, rearm after resume resp. source invoke will reenable them
1515 	if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) ||
1516 			ds->ds_pending_data) {
1517 		tidx = DISPATCH_TIMER_INDEX_DISARM;
1518 		(void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
1519 	} else {
1520 		tidx = _dispatch_source_timer_idx(dr);
1521 	}
1522 	if (slowpath(ds_timer_aggregate(ds))) {
1523 		_dispatch_timer_aggregates_register(ds);
1524 	}
1525 	if (slowpath(!ds->ds_is_installed)) {
1526 		ds->ds_is_installed = true;
1527 		if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1528 			(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
1529 		}
1530 		_dispatch_object_debug(ds, "%s", __func__);
1531 		ds->ds_dkev = NULL;
1532 		free(dk);
1533 	} else {
1534 		_dispatch_timers_unregister(ds, dk);
1535 	}
1536 	if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1537 		_dispatch_timers_reconfigure = true;
1538 		_dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1539 	}
1540 	if (dk != &_dispatch_kevent_timer[tidx]){
1541 		ds->ds_dkev = &_dispatch_kevent_timer[tidx];
1542 	}
1543 	_dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list,
1544 			_dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1545 	if (slowpath(ds_timer_aggregate(ds))) {
1546 		_dispatch_timer_aggregates_update(ds, tidx);
1547 	}
1548 }
1549 
1550 static inline void
_dispatch_timers_run2(uint64_t nows[],unsigned int tidx)1551 _dispatch_timers_run2(uint64_t nows[], unsigned int tidx)
1552 {
1553 	dispatch_source_refs_t dr;
1554 	dispatch_source_t ds;
1555 	uint64_t now, missed;
1556 
1557 	now = _dispatch_source_timer_now(nows, tidx);
1558 	while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) {
1559 		ds = _dispatch_source_from_refs(dr);
1560 		// We may find timers on the wrong list due to a pending update from
1561 		// dispatch_source_set_timer. Force an update of the list in that case.
1562 		if (tidx != ds->ds_ident_hack) {
1563 			_dispatch_timers_update(ds);
1564 			continue;
1565 		}
1566 		if (!ds_timer(dr).target) {
1567 			// No configured timers on the list
1568 			break;
1569 		}
1570 		if (ds_timer(dr).target > now) {
1571 			// Done running timers for now.
1572 			break;
1573 		}
1574 		// Remove timers that are suspended or have missed intervals from the
1575 		// list, rearm after resume resp. source invoke will reenable them
1576 		if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) {
1577 			_dispatch_timers_update(ds);
1578 			continue;
1579 		}
1580 		// Calculate number of missed intervals.
1581 		missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
1582 		if (++missed > INT_MAX) {
1583 			missed = INT_MAX;
1584 		}
1585 		if (ds_timer(dr).interval < INT64_MAX) {
1586 			ds_timer(dr).target += missed * ds_timer(dr).interval;
1587 			ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway;
1588 		} else {
1589 			ds_timer(dr).target = UINT64_MAX;
1590 			ds_timer(dr).deadline = UINT64_MAX;
1591 		}
1592 		_dispatch_timers_update(ds);
1593 		ds_timer(dr).last_fire = now;
1594 
1595 		unsigned long data;
1596 		data = dispatch_atomic_add2o(ds, ds_pending_data,
1597 				(unsigned long)missed, relaxed);
1598 		_dispatch_trace_timer_fire(dr, data, (unsigned long)missed);
1599 		_dispatch_wakeup(ds);
1600 	}
1601 }
1602 
1603 DISPATCH_NOINLINE
1604 static void
_dispatch_timers_run(uint64_t nows[])1605 _dispatch_timers_run(uint64_t nows[])
1606 {
1607 	unsigned int tidx;
1608 	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1609 		if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) {
1610 			_dispatch_timers_run2(nows, tidx);
1611 		}
1612 	}
1613 }
1614 
1615 static inline unsigned int
_dispatch_timers_get_delay(uint64_t nows[],struct dispatch_timer_s timer[],uint64_t * delay,uint64_t * leeway,int qos)1616 _dispatch_timers_get_delay(uint64_t nows[], struct dispatch_timer_s timer[],
1617 		uint64_t *delay, uint64_t *leeway, int qos)
1618 {
1619 	unsigned int tidx, ridx = DISPATCH_TIMER_COUNT;
1620 	uint64_t tmp, delta = UINT64_MAX, dldelta = UINT64_MAX;
1621 
1622 	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1623 		if (qos >= 0 && qos != DISPATCH_TIMER_QOS(tidx)){
1624 			continue;
1625 		}
1626 		uint64_t target = timer[tidx].target;
1627 		if (target == UINT64_MAX) {
1628 			continue;
1629 		}
1630 		uint64_t deadline = timer[tidx].deadline;
1631 		if (qos >= 0) {
1632 			// Timer pre-coalescing <rdar://problem/13222034>
1633 			uint64_t window = _dispatch_kevent_coalescing_window[qos];
1634 			uint64_t latest = deadline > window ? deadline - window : 0;
1635 			dispatch_source_refs_t dri;
1636 			TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources,
1637 					dr_list) {
1638 				tmp = ds_timer(dri).target;
1639 				if (tmp > latest) break;
1640 				target = tmp;
1641 			}
1642 		}
1643 		uint64_t now = _dispatch_source_timer_now(nows, tidx);
1644 		if (target <= now) {
1645 			delta = 0;
1646 			break;
1647 		}
1648 		tmp = target - now;
1649 		if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1650 			tmp = _dispatch_time_mach2nano(tmp);
1651 		}
1652 		if (tmp < INT64_MAX && tmp < delta) {
1653 			ridx = tidx;
1654 			delta = tmp;
1655 		}
1656 		dispatch_assert(target <= deadline);
1657 		tmp = deadline - now;
1658 		if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1659 			tmp = _dispatch_time_mach2nano(tmp);
1660 		}
1661 		if (tmp < INT64_MAX && tmp < dldelta) {
1662 			dldelta = tmp;
1663 		}
1664 	}
1665 	*delay = delta;
1666 	*leeway = delta && delta < UINT64_MAX ? dldelta - delta : UINT64_MAX;
1667 	return ridx;
1668 }
1669 
1670 static bool
_dispatch_timers_program2(uint64_t nows[],struct kevent64_s * ke,unsigned int qos)1671 _dispatch_timers_program2(uint64_t nows[], struct kevent64_s *ke,
1672 		unsigned int qos)
1673 {
1674 	unsigned int tidx;
1675 	bool poll;
1676 	uint64_t delay, leeway;
1677 
1678 	tidx = _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway,
1679 			(int)qos);
1680 	poll = (delay == 0);
1681 	if (poll || delay == UINT64_MAX) {
1682 		_dispatch_trace_next_timer_set(NULL, qos);
1683 		if (!ke->data) {
1684 			return poll;
1685 		}
1686 		ke->data = 0;
1687 		ke->flags |= EV_DELETE;
1688 		ke->flags &= ~(EV_ADD|EV_ENABLE);
1689 	} else {
1690 		_dispatch_trace_next_timer_set(
1691 				TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), qos);
1692 		_dispatch_trace_next_timer_program(delay, qos);
1693 		delay += _dispatch_source_timer_now(nows, DISPATCH_TIMER_KIND_WALL);
1694 		if (slowpath(_dispatch_timers_force_max_leeway)) {
1695 			ke->data = (int64_t)(delay + leeway);
1696 			ke->ext[1] = 0;
1697 		} else {
1698 			ke->data = (int64_t)delay;
1699 			ke->ext[1] = leeway;
1700 		}
1701 		ke->flags |= EV_ADD|EV_ENABLE;
1702 		ke->flags &= ~EV_DELETE;
1703 	}
1704 	_dispatch_kq_update(ke);
1705 	return poll;
1706 }
1707 
1708 DISPATCH_NOINLINE
1709 static bool
_dispatch_timers_program(uint64_t nows[])1710 _dispatch_timers_program(uint64_t nows[])
1711 {
1712 	bool poll = false;
1713 	unsigned int qos, qosm = _dispatch_timers_qos_mask;
1714 	for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
1715 		if (!(qosm & 1 << qos)){
1716 			continue;
1717 		}
1718 		poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[qos],
1719 				qos);
1720 	}
1721 	return poll;
1722 }
1723 
1724 DISPATCH_NOINLINE
1725 static bool
_dispatch_timers_configure(void)1726 _dispatch_timers_configure(void)
1727 {
1728 	_dispatch_timer_aggregates_check();
1729 	// Find out if there is a new target/deadline on the timer lists
1730 	return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer);
1731 }
1732 
1733 static void
_dispatch_timers_calendar_change(void)1734 _dispatch_timers_calendar_change(void)
1735 {
1736 	// calendar change may have gone past the wallclock deadline
1737 	_dispatch_timer_expired = true;
1738 	_dispatch_timers_qos_mask = ~0u;
1739 }
1740 
1741 static void
_dispatch_timers_kevent(struct kevent64_s * ke)1742 _dispatch_timers_kevent(struct kevent64_s *ke)
1743 {
1744 	dispatch_assert(ke->data > 0);
1745 	dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) ==
1746 			DISPATCH_KEVENT_TIMEOUT_IDENT_MASK);
1747 	unsigned int qos = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK;
1748 	dispatch_assert(qos < DISPATCH_TIMER_QOS_COUNT);
1749 	dispatch_assert(_dispatch_kevent_timeout[qos].data);
1750 	_dispatch_kevent_timeout[qos].data = 0; // kevent deleted via EV_ONESHOT
1751 	_dispatch_timer_expired = true;
1752 	_dispatch_timers_qos_mask |= 1 << qos;
1753 	_dispatch_trace_next_timer_wake(qos);
1754 }
1755 
1756 static inline bool
_dispatch_mgr_timers(void)1757 _dispatch_mgr_timers(void)
1758 {
1759 	uint64_t nows[DISPATCH_TIMER_KIND_COUNT] = {};
1760 	bool expired = slowpath(_dispatch_timer_expired);
1761 	if (expired) {
1762 		_dispatch_timers_run(nows);
1763 	}
1764 	bool reconfigure = slowpath(_dispatch_timers_reconfigure);
1765 	if (reconfigure || expired) {
1766 		if (reconfigure) {
1767 			reconfigure = _dispatch_timers_configure();
1768 			_dispatch_timers_reconfigure = false;
1769 		}
1770 		if (reconfigure || expired) {
1771 			expired = _dispatch_timer_expired = _dispatch_timers_program(nows);
1772 			expired = expired || _dispatch_mgr_q.dq_items_tail;
1773 		}
1774 		_dispatch_timers_qos_mask = 0;
1775 	}
1776 	return expired;
1777 }
1778 
1779 #pragma mark -
1780 #pragma mark dispatch_timer_aggregate
1781 
1782 typedef struct {
1783 	TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources;
1784 } dispatch_timer_aggregate_refs_s;
1785 
1786 typedef struct dispatch_timer_aggregate_s {
1787 	DISPATCH_STRUCT_HEADER(queue);
1788 	DISPATCH_QUEUE_HEADER;
1789 	TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list;
1790 	dispatch_timer_aggregate_refs_s
1791 			dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT];
1792 	struct {
1793 		DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s);
1794 	} dta_timer[DISPATCH_TIMER_COUNT];
1795 	struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT];
1796 	unsigned int dta_refcount;
1797 } dispatch_timer_aggregate_s;
1798 
1799 typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s;
1800 static dispatch_timer_aggregates_s _dispatch_timer_aggregates =
1801 		TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates);
1802 
1803 dispatch_timer_aggregate_t
dispatch_timer_aggregate_create(void)1804 dispatch_timer_aggregate_create(void)
1805 {
1806 	unsigned int tidx;
1807 	dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue),
1808 			sizeof(struct dispatch_timer_aggregate_s));
1809 	_dispatch_queue_init((dispatch_queue_t)dta);
1810 	dta->do_targetq = _dispatch_get_root_queue(
1811 			_DISPATCH_QOS_CLASS_USER_INITIATED, true);
1812 	dta->dq_width = DISPATCH_QUEUE_WIDTH_MAX;
1813 	//FIXME: aggregates need custom vtable
1814 	//dta->dq_label = "timer-aggregate";
1815 	for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) {
1816 		TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources);
1817 	}
1818 	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1819 		TAILQ_INIT(&dta->dta_timer[tidx].dt_sources);
1820 		dta->dta_timer[tidx].target = UINT64_MAX;
1821 		dta->dta_timer[tidx].deadline = UINT64_MAX;
1822 		dta->dta_timer_data[tidx].target = UINT64_MAX;
1823 		dta->dta_timer_data[tidx].deadline = UINT64_MAX;
1824 	}
1825 	return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create(
1826 			(dispatch_queue_t)dta);
1827 }
1828 
1829 typedef struct dispatch_timer_delay_s {
1830 	dispatch_timer_t timer;
1831 	uint64_t delay, leeway;
1832 } *dispatch_timer_delay_t;
1833 
1834 static void
_dispatch_timer_aggregate_get_delay(void * ctxt)1835 _dispatch_timer_aggregate_get_delay(void *ctxt)
1836 {
1837 	dispatch_timer_delay_t dtd = ctxt;
1838 	struct { uint64_t nows[DISPATCH_TIMER_KIND_COUNT]; } dtn = {};
1839 	_dispatch_timers_get_delay(dtn.nows, dtd->timer, &dtd->delay, &dtd->leeway,
1840 			-1);
1841 }
1842 
1843 uint64_t
dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta,uint64_t * leeway_ptr)1844 dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta,
1845 		uint64_t *leeway_ptr)
1846 {
1847 	struct dispatch_timer_delay_s dtd = {
1848 		.timer = dta->dta_timer_data,
1849 	};
1850 	dispatch_sync_f((dispatch_queue_t)dta, &dtd,
1851 			_dispatch_timer_aggregate_get_delay);
1852 	if (leeway_ptr) {
1853 		*leeway_ptr = dtd.leeway;
1854 	}
1855 	return dtd.delay;
1856 }
1857 
1858 static void
_dispatch_timer_aggregate_update(void * ctxt)1859 _dispatch_timer_aggregate_update(void *ctxt)
1860 {
1861 	dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current();
1862 	dispatch_timer_t dtau = ctxt;
1863 	unsigned int tidx;
1864 	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1865 		dta->dta_timer_data[tidx].target = dtau[tidx].target;
1866 		dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline;
1867 	}
1868 	free(dtau);
1869 }
1870 
1871 DISPATCH_NOINLINE
1872 static void
_dispatch_timer_aggregates_configure(void)1873 _dispatch_timer_aggregates_configure(void)
1874 {
1875 	dispatch_timer_aggregate_t dta;
1876 	dispatch_timer_t dtau;
1877 	TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) {
1878 		if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) {
1879 			continue;
1880 		}
1881 		dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau));
1882 		memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer));
1883 		_dispatch_barrier_async_detached_f((dispatch_queue_t)dta, dtau,
1884 				_dispatch_timer_aggregate_update);
1885 	}
1886 }
1887 
1888 static inline void
_dispatch_timer_aggregates_check(void)1889 _dispatch_timer_aggregates_check(void)
1890 {
1891 	if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) {
1892 		return;
1893 	}
1894 	_dispatch_timer_aggregates_configure();
1895 }
1896 
1897 static void
_dispatch_timer_aggregates_register(dispatch_source_t ds)1898 _dispatch_timer_aggregates_register(dispatch_source_t ds)
1899 {
1900 	dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1901 	if (!dta->dta_refcount++) {
1902 		TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list);
1903 	}
1904 }
1905 
1906 DISPATCH_NOINLINE
1907 static void
_dispatch_timer_aggregates_update(dispatch_source_t ds,unsigned int tidx)1908 _dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx)
1909 {
1910 	dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1911 	dispatch_timer_source_aggregate_refs_t dr;
1912 	dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1913 	_dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list,
1914 			dta->dta_timer, dr, dta_list);
1915 }
1916 
1917 DISPATCH_NOINLINE
1918 static void
_dispatch_timer_aggregates_unregister(dispatch_source_t ds,unsigned int tidx)1919 _dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx)
1920 {
1921 	dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1922 	dispatch_timer_source_aggregate_refs_t dr;
1923 	dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1924 	_dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL,
1925 			dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list);
1926 	if (!--dta->dta_refcount) {
1927 		TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list);
1928 	}
1929 }
1930 
1931 #pragma mark -
1932 #pragma mark dispatch_select
1933 
1934 static int _dispatch_kq;
1935 
1936 static unsigned int _dispatch_select_workaround;
1937 static fd_set _dispatch_rfds;
1938 static fd_set _dispatch_wfds;
1939 static uint64_t*_dispatch_rfd_ptrs;
1940 static uint64_t*_dispatch_wfd_ptrs;
1941 
1942 DISPATCH_NOINLINE
1943 static bool
_dispatch_select_register(struct kevent64_s * kev)1944 _dispatch_select_register(struct kevent64_s *kev)
1945 {
1946 
1947 	// Must execute on manager queue
1948 	DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1949 
1950 	// If an EINVAL or ENOENT error occurred while adding/enabling a read or
1951 	// write kevent, assume it was due to a type of filedescriptor not
1952 	// supported by kqueue and fall back to select
1953 	switch (kev->filter) {
1954 	case EVFILT_READ:
1955 		if ((kev->data == EINVAL || kev->data == ENOENT) &&
1956 				dispatch_assume(kev->ident < FD_SETSIZE)) {
1957 			FD_SET((int)kev->ident, &_dispatch_rfds);
1958 			if (slowpath(!_dispatch_rfd_ptrs)) {
1959 				_dispatch_rfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1960 						sizeof(*_dispatch_rfd_ptrs));
1961 			}
1962 			if (!_dispatch_rfd_ptrs[kev->ident]) {
1963 				_dispatch_rfd_ptrs[kev->ident] = kev->udata;
1964 				_dispatch_select_workaround++;
1965 				_dispatch_debug("select workaround used to read fd %d: 0x%lx",
1966 						(int)kev->ident, (long)kev->data);
1967 			}
1968 		}
1969 		return true;
1970 	case EVFILT_WRITE:
1971 		if ((kev->data == EINVAL || kev->data == ENOENT) &&
1972 				dispatch_assume(kev->ident < FD_SETSIZE)) {
1973 			FD_SET((int)kev->ident, &_dispatch_wfds);
1974 			if (slowpath(!_dispatch_wfd_ptrs)) {
1975 				_dispatch_wfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1976 						sizeof(*_dispatch_wfd_ptrs));
1977 			}
1978 			if (!_dispatch_wfd_ptrs[kev->ident]) {
1979 				_dispatch_wfd_ptrs[kev->ident] = kev->udata;
1980 				_dispatch_select_workaround++;
1981 				_dispatch_debug("select workaround used to write fd %d: 0x%lx",
1982 						(int)kev->ident, (long)kev->data);
1983 			}
1984 		}
1985 		return true;
1986 	}
1987 	return false;
1988 }
1989 
1990 DISPATCH_NOINLINE
1991 static bool
_dispatch_select_unregister(const struct kevent64_s * kev)1992 _dispatch_select_unregister(const struct kevent64_s *kev)
1993 {
1994 	// Must execute on manager queue
1995 	DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1996 
1997 	switch (kev->filter) {
1998 	case EVFILT_READ:
1999 		if (_dispatch_rfd_ptrs && kev->ident < FD_SETSIZE &&
2000 				_dispatch_rfd_ptrs[kev->ident]) {
2001 			FD_CLR((int)kev->ident, &_dispatch_rfds);
2002 			_dispatch_rfd_ptrs[kev->ident] = 0;
2003 			_dispatch_select_workaround--;
2004 			return true;
2005 		}
2006 		break;
2007 	case EVFILT_WRITE:
2008 		if (_dispatch_wfd_ptrs && kev->ident < FD_SETSIZE &&
2009 				_dispatch_wfd_ptrs[kev->ident]) {
2010 			FD_CLR((int)kev->ident, &_dispatch_wfds);
2011 			_dispatch_wfd_ptrs[kev->ident] = 0;
2012 			_dispatch_select_workaround--;
2013 			return true;
2014 		}
2015 		break;
2016 	}
2017 	return false;
2018 }
2019 
2020 DISPATCH_NOINLINE
2021 static bool
_dispatch_mgr_select(bool poll)2022 _dispatch_mgr_select(bool poll)
2023 {
2024 	static const struct timeval timeout_immediately = { 0, 0 };
2025 	fd_set tmp_rfds, tmp_wfds;
2026 	struct kevent64_s kev;
2027 	int err, i, r;
2028 	bool kevent_avail = false;
2029 
2030 	FD_COPY(&_dispatch_rfds, &tmp_rfds);
2031 	FD_COPY(&_dispatch_wfds, &tmp_wfds);
2032 
2033 	r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL,
2034 			poll ? (struct timeval*)&timeout_immediately : NULL);
2035 	if (slowpath(r == -1)) {
2036 		err = errno;
2037 		if (err != EBADF) {
2038 			if (err != EINTR) {
2039 				(void)dispatch_assume_zero(err);
2040 			}
2041 			return false;
2042 		}
2043 		for (i = 0; i < FD_SETSIZE; i++) {
2044 			if (i == _dispatch_kq) {
2045 				continue;
2046 			}
2047 			if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)){
2048 				continue;
2049 			}
2050 			r = dup(i);
2051 			if (dispatch_assume(r != -1)) {
2052 				close(r);
2053 			} else {
2054 				if (_dispatch_rfd_ptrs && _dispatch_rfd_ptrs[i]) {
2055 					FD_CLR(i, &_dispatch_rfds);
2056 					_dispatch_rfd_ptrs[i] = 0;
2057 					_dispatch_select_workaround--;
2058 				}
2059 				if (_dispatch_wfd_ptrs && _dispatch_wfd_ptrs[i]) {
2060 					FD_CLR(i, &_dispatch_wfds);
2061 					_dispatch_wfd_ptrs[i] = 0;
2062 					_dispatch_select_workaround--;
2063 				}
2064 			}
2065 		}
2066 		return false;
2067 	}
2068 	if (r > 0) {
2069 		for (i = 0; i < FD_SETSIZE; i++) {
2070 			if (FD_ISSET(i, &tmp_rfds)) {
2071 				if (i == _dispatch_kq) {
2072 					kevent_avail = true;
2073 					continue;
2074 				}
2075 				FD_CLR(i, &_dispatch_rfds); // emulate EV_DISPATCH
2076 				EV_SET64(&kev, i, EVFILT_READ,
2077 						EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2078 						_dispatch_rfd_ptrs[i], 0, 0);
2079 				_dispatch_kevent_drain(&kev);
2080 			}
2081 			if (FD_ISSET(i, &tmp_wfds)) {
2082 				FD_CLR(i, &_dispatch_wfds); // emulate EV_DISPATCH
2083 				EV_SET64(&kev, i, EVFILT_WRITE,
2084 						EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2085 						_dispatch_wfd_ptrs[i], 0, 0);
2086 				_dispatch_kevent_drain(&kev);
2087 			}
2088 		}
2089 	}
2090 	return kevent_avail;
2091 }
2092 
2093 #pragma mark -
2094 #pragma mark dispatch_kqueue
2095 
2096 static void
_dispatch_kq_init(void * context DISPATCH_UNUSED)2097 _dispatch_kq_init(void *context DISPATCH_UNUSED)
2098 {
2099 	static const struct kevent64_s kev = {
2100 		.ident = 1,
2101 		.filter = EVFILT_USER,
2102 		.flags = EV_ADD|EV_CLEAR,
2103 	};
2104 
2105 	_dispatch_safe_fork = false;
2106 #if DISPATCH_USE_GUARDED_FD
2107 	guardid_t guard = (uintptr_t)&kev;
2108 	_dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP);
2109 #else
2110 	_dispatch_kq = kqueue();
2111 #endif
2112 	if (_dispatch_kq == -1) {
2113 		int err = errno;
2114 		switch (err) {
2115 		case EMFILE:
2116 			DISPATCH_CLIENT_CRASH("kqueue() failure: "
2117 					"process is out of file descriptors");
2118 			break;
2119 		case ENFILE:
2120 			DISPATCH_CLIENT_CRASH("kqueue() failure: "
2121 					"system is out of file descriptors");
2122 			break;
2123 		case ENOMEM:
2124 			DISPATCH_CLIENT_CRASH("kqueue() failure: "
2125 					"kernel is out of memory");
2126 			break;
2127 		default:
2128 			(void)dispatch_assume_zero(err);
2129 			DISPATCH_CRASH("kqueue() failure");
2130 			break;
2131 		}
2132 	} else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) {
2133 		// in case we fall back to select()
2134 		FD_SET(_dispatch_kq, &_dispatch_rfds);
2135 	}
2136 
2137 	(void)dispatch_assume_zero(kevent64(_dispatch_kq, &kev, 1, NULL, 0, 0,
2138 			NULL));
2139 	_dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q, 0);
2140 }
2141 
2142 static int
_dispatch_get_kq(void)2143 _dispatch_get_kq(void)
2144 {
2145 	static dispatch_once_t pred;
2146 
2147 	dispatch_once_f(&pred, NULL, _dispatch_kq_init);
2148 
2149 	return _dispatch_kq;
2150 }
2151 
2152 DISPATCH_NOINLINE
2153 static long
_dispatch_kq_update(const struct kevent64_s * kev)2154 _dispatch_kq_update(const struct kevent64_s *kev)
2155 {
2156 	int r;
2157 	struct kevent64_s kev_copy;
2158 
2159 	if (slowpath(_dispatch_select_workaround) && (kev->flags & EV_DELETE)) {
2160 		if (_dispatch_select_unregister(kev)) {
2161 			return 0;
2162 		}
2163 	}
2164 	kev_copy = *kev;
2165 	// This ensures we don't get a pending kevent back while registering
2166 	// a new kevent
2167 	kev_copy.flags |= EV_RECEIPT;
2168 retry:
2169 	r = dispatch_assume(kevent64(_dispatch_get_kq(), &kev_copy, 1,
2170 			&kev_copy, 1, 0, NULL));
2171 	if (slowpath(r == -1)) {
2172 		int err = errno;
2173 		switch (err) {
2174 		case EINTR:
2175 			goto retry;
2176 		case EBADF:
2177 			DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2178 			break;
2179 		default:
2180 			(void)dispatch_assume_zero(err);
2181 			break;
2182 		}
2183 		return err;
2184 	}
2185 	switch (kev_copy.data) {
2186 	case 0:
2187 		return 0;
2188 	case EBADF:
2189 	case EPERM:
2190 	case EINVAL:
2191 	case ENOENT:
2192 		if ((kev->flags & (EV_ADD|EV_ENABLE)) && !(kev->flags & EV_DELETE)) {
2193 			if (_dispatch_select_register(&kev_copy)) {
2194 				return 0;
2195 			}
2196 		}
2197 		// fall through
2198 	default:
2199 		kev_copy.flags |= kev->flags;
2200 		_dispatch_kevent_drain(&kev_copy);
2201 		break;
2202 	}
2203 	return (long)kev_copy.data;
2204 }
2205 
2206 #pragma mark -
2207 #pragma mark dispatch_mgr
2208 
2209 static struct kevent64_s *_dispatch_kevent_enable;
2210 
2211 static void inline
_dispatch_mgr_kevent_reenable(struct kevent64_s * ke)2212 _dispatch_mgr_kevent_reenable(struct kevent64_s *ke)
2213 {
2214 	dispatch_assert(!_dispatch_kevent_enable || _dispatch_kevent_enable == ke);
2215 	_dispatch_kevent_enable = ke;
2216 }
2217 
2218 unsigned long
_dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED)2219 _dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED)
2220 {
2221 	if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
2222 		return false;
2223 	}
2224 
2225 	static const struct kevent64_s kev = {
2226 		.ident = 1,
2227 		.filter = EVFILT_USER,
2228 		.fflags = NOTE_TRIGGER,
2229 	};
2230 
2231 #if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2232 	_dispatch_debug("waking up the dispatch manager queue: %p", dq);
2233 #endif
2234 
2235 	_dispatch_kq_update(&kev);
2236 
2237 	return false;
2238 }
2239 
2240 DISPATCH_NOINLINE
2241 static void
_dispatch_mgr_init(void)2242 _dispatch_mgr_init(void)
2243 {
2244 	(void)dispatch_atomic_inc2o(&_dispatch_mgr_q, dq_running, relaxed);
2245 	_dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q);
2246 	_dispatch_queue_set_bound_thread(&_dispatch_mgr_q);
2247 	_dispatch_mgr_priority_init();
2248 	_dispatch_kevent_init();
2249 	_dispatch_timers_init();
2250 	_dispatch_mach_recv_msg_buf_init();
2251 	_dispatch_memorystatus_init();
2252 }
2253 
2254 DISPATCH_NOINLINE DISPATCH_NORETURN
2255 static void
_dispatch_mgr_invoke(void)2256 _dispatch_mgr_invoke(void)
2257 {
2258 	static const struct timespec timeout_immediately = { 0, 0 };
2259 	struct kevent64_s kev;
2260 	bool poll;
2261 	int r;
2262 
2263 	for (;;) {
2264 		_dispatch_mgr_queue_drain();
2265 		poll = _dispatch_mgr_timers();
2266 		if (slowpath(_dispatch_select_workaround)) {
2267 			poll = _dispatch_mgr_select(poll);
2268 			if (!poll) continue;
2269 		}
2270 		poll = poll || _dispatch_queue_class_probe(&_dispatch_mgr_q);
2271 		r = kevent64(_dispatch_kq, _dispatch_kevent_enable,
2272 				_dispatch_kevent_enable ? 1 : 0, &kev, 1, 0,
2273 				poll ? &timeout_immediately : NULL);
2274 		_dispatch_kevent_enable = NULL;
2275 		if (slowpath(r == -1)) {
2276 			int err = errno;
2277 			switch (err) {
2278 			case EINTR:
2279 				break;
2280 			case EBADF:
2281 				DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2282 				break;
2283 			default:
2284 				(void)dispatch_assume_zero(err);
2285 				break;
2286 			}
2287 		} else if (r) {
2288 			_dispatch_kevent_drain(&kev);
2289 		}
2290 	}
2291 }
2292 
2293 DISPATCH_NORETURN
2294 void
_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED)2295 _dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED)
2296 {
2297 	_dispatch_mgr_init();
2298 	// never returns, so burn bridges behind us & clear stack 2k ahead
2299 	_dispatch_clear_stack(2048);
2300 	_dispatch_mgr_invoke();
2301 }
2302 
2303 #pragma mark -
2304 #pragma mark dispatch_memorystatus
2305 
2306 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2307 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS
2308 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \
2309 		DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \
2310 		DISPATCH_MEMORYSTATUS_PRESSURE_WARN)
2311 #elif DISPATCH_USE_VM_PRESSURE_SOURCE
2312 #define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM
2313 #define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE
2314 #endif
2315 
2316 #if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2317 static dispatch_source_t _dispatch_memorystatus_source;
2318 
2319 static void
_dispatch_memorystatus_handler(void * context DISPATCH_UNUSED)2320 _dispatch_memorystatus_handler(void *context DISPATCH_UNUSED)
2321 {
2322 #if DISPATCH_USE_MEMORYSTATUS_SOURCE
2323 	unsigned long memorystatus;
2324 	memorystatus = dispatch_source_get_data(_dispatch_memorystatus_source);
2325 	if (memorystatus & DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL) {
2326 		_dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
2327 		_voucher_activity_heap_pressure_normal();
2328 		return;
2329 	}
2330 	_dispatch_continuation_cache_limit =
2331 			DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN;
2332 	_voucher_activity_heap_pressure_warn();
2333 #endif
2334 	malloc_zone_pressure_relief(0,0);
2335 }
2336 
2337 static void
_dispatch_memorystatus_init(void)2338 _dispatch_memorystatus_init(void)
2339 {
2340 	_dispatch_memorystatus_source = dispatch_source_create(
2341 			DISPATCH_MEMORYSTATUS_SOURCE_TYPE, 0,
2342 			DISPATCH_MEMORYSTATUS_SOURCE_MASK,
2343 			_dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT, true));
2344 	dispatch_source_set_event_handler_f(_dispatch_memorystatus_source,
2345 			_dispatch_memorystatus_handler);
2346 	dispatch_resume(_dispatch_memorystatus_source);
2347 }
2348 #else
_dispatch_memorystatus_init(void)2349 static inline void _dispatch_memorystatus_init(void) {}
2350 #endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2351 
2352 #pragma mark -
2353 #pragma mark dispatch_mach
2354 
2355 #if HAVE_MACH
2356 
2357 #if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
2358 #define _dispatch_debug_machport(name) \
2359 		dispatch_debug_machport((name), __func__)
2360 #else
2361 #define _dispatch_debug_machport(name) ((void)(name))
2362 #endif
2363 
2364 // Flags for all notifications that are registered/unregistered when a
2365 // send-possible notification is requested/delivered
2366 #define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
2367 		DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
2368 #define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
2369 		DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2370 		DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2371 #define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
2372 		DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2373 		DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2374 
2375 #define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
2376 #define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
2377 		(MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
2378 
2379 #define _DISPATCH_MACHPORT_HASH_SIZE 32
2380 #define _DISPATCH_MACHPORT_HASH(x) \
2381 		_DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
2382 
2383 #ifndef MACH_RCV_LARGE_IDENTITY
2384 #define MACH_RCV_LARGE_IDENTITY 0x00000008
2385 #endif
2386 #ifndef MACH_RCV_VOUCHER
2387 #define MACH_RCV_VOUCHER 0x00000800
2388 #endif
2389 #define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
2390 #define DISPATCH_MACH_RCV_OPTIONS ( \
2391 		MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
2392 		MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
2393 		MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0)) | \
2394 		MACH_RCV_VOUCHER
2395 
2396 #define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0])
2397 
2398 static void _dispatch_kevent_machport_drain(struct kevent64_s *ke);
2399 static void _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke);
2400 static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr);
2401 static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr);
2402 static void _dispatch_source_merge_mach_msg(dispatch_source_t ds,
2403 		dispatch_source_refs_t dr, dispatch_kevent_t dk,
2404 		mach_msg_header_t *hdr, mach_msg_size_t siz);
2405 static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
2406 		uint32_t new_flags, uint32_t del_flags, uint32_t mask,
2407 		mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
2408 static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr);
2409 static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
2410 		dispatch_mach_reply_refs_t dmr, bool disconnected);
2411 static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm);
2412 static inline void _dispatch_mach_msg_set_options(dispatch_object_t dou,
2413 		mach_msg_option_t options);
2414 static void _dispatch_mach_msg_recv(dispatch_mach_t dm,
2415 		dispatch_mach_reply_refs_t dmr, mach_msg_header_t *hdr,
2416 		mach_msg_size_t siz);
2417 static void _dispatch_mach_merge_kevent(dispatch_mach_t dm,
2418 		const struct kevent64_s *ke);
2419 static inline mach_msg_option_t _dispatch_mach_checkin_options(void);
2420 
2421 static const size_t _dispatch_mach_recv_msg_size =
2422 		DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE;
2423 static const size_t dispatch_mach_trailer_size =
2424 		sizeof(dispatch_mach_trailer_t);
2425 static mach_msg_size_t _dispatch_mach_recv_msg_buf_size;
2426 static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset;
2427 static mach_port_t _dispatch_mach_notify_port;
2428 static struct kevent64_s _dispatch_mach_recv_kevent = {
2429 	.filter = EVFILT_MACHPORT,
2430 	.flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
2431 	.fflags = DISPATCH_MACH_RCV_OPTIONS,
2432 };
2433 static dispatch_source_t _dispatch_mach_notify_source;
2434 static const
2435 struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = {
2436 	.ke = {
2437 		.filter = EVFILT_MACHPORT,
2438 		.flags = EV_CLEAR,
2439 		.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT,
2440 	},
2441 };
2442 
2443 static void
_dispatch_mach_recv_msg_buf_init(void)2444 _dispatch_mach_recv_msg_buf_init(void)
2445 {
2446 	mach_vm_size_t vm_size = mach_vm_round_page(
2447 			_dispatch_mach_recv_msg_size + dispatch_mach_trailer_size);
2448 	_dispatch_mach_recv_msg_buf_size = (mach_msg_size_t)vm_size;
2449 	mach_vm_address_t vm_addr = vm_page_size;
2450 	kern_return_t kr;
2451 
2452 	while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size,
2453 			VM_FLAGS_ANYWHERE))) {
2454 		if (kr != KERN_NO_SPACE) {
2455 			(void)dispatch_assume_zero(kr);
2456 			DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer");
2457 		}
2458 		_dispatch_temporary_resource_shortage();
2459 		vm_addr = vm_page_size;
2460 	}
2461 	_dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr;
2462 	_dispatch_mach_recv_kevent.ext[1] = vm_size;
2463 }
2464 
2465 static inline void*
_dispatch_get_mach_recv_msg_buf(void)2466 _dispatch_get_mach_recv_msg_buf(void)
2467 {
2468 	return (void*)_dispatch_mach_recv_kevent.ext[0];
2469 }
2470 
2471 static void
_dispatch_mach_recv_portset_init(void * context DISPATCH_UNUSED)2472 _dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED)
2473 {
2474 	kern_return_t kr;
2475 
2476 	kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2477 			&_dispatch_mach_recv_portset);
2478 	DISPATCH_VERIFY_MIG(kr);
2479 	if (dispatch_assume_zero(kr)) {
2480 		DISPATCH_CLIENT_CRASH(
2481 				"mach_port_allocate() failed: cannot create port set");
2482 	}
2483 	dispatch_assert(_dispatch_get_mach_recv_msg_buf());
2484 	dispatch_assert(dispatch_mach_trailer_size ==
2485 			REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
2486 			DISPATCH_MACH_RCV_TRAILER)));
2487 	_dispatch_mach_recv_kevent.ident = _dispatch_mach_recv_portset;
2488 	_dispatch_kq_update(&_dispatch_mach_recv_kevent);
2489 
2490 	kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
2491 			&_dispatch_mach_notify_port);
2492 	DISPATCH_VERIFY_MIG(kr);
2493 	if (dispatch_assume_zero(kr)) {
2494 		DISPATCH_CLIENT_CRASH(
2495 				"mach_port_allocate() failed: cannot create receive right");
2496 	}
2497 	_dispatch_mach_notify_source = dispatch_source_create(
2498 			&_dispatch_source_type_mach_recv_direct,
2499 			_dispatch_mach_notify_port, 0, &_dispatch_mgr_q);
2500 	static const struct dispatch_continuation_s dc = {
2501 		.dc_func = (void*)_dispatch_mach_notify_source_invoke,
2502 	};
2503 	_dispatch_mach_notify_source->ds_refs->ds_handler[DS_EVENT_HANDLER] =
2504 			(dispatch_continuation_t)&dc;
2505 	dispatch_assert(_dispatch_mach_notify_source);
2506 	dispatch_resume(_dispatch_mach_notify_source);
2507 }
2508 
2509 static mach_port_t
_dispatch_get_mach_recv_portset(void)2510 _dispatch_get_mach_recv_portset(void)
2511 {
2512 	static dispatch_once_t pred;
2513 	dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init);
2514 	return _dispatch_mach_recv_portset;
2515 }
2516 
2517 static void
_dispatch_mach_portset_init(void * context DISPATCH_UNUSED)2518 _dispatch_mach_portset_init(void *context DISPATCH_UNUSED)
2519 {
2520 	struct kevent64_s kev = {
2521 		.filter = EVFILT_MACHPORT,
2522 		.flags = EV_ADD,
2523 	};
2524 	kern_return_t kr;
2525 
2526 	kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2527 			&_dispatch_mach_portset);
2528 	DISPATCH_VERIFY_MIG(kr);
2529 	if (dispatch_assume_zero(kr)) {
2530 		DISPATCH_CLIENT_CRASH(
2531 				"mach_port_allocate() failed: cannot create port set");
2532 	}
2533 	kev.ident = _dispatch_mach_portset;
2534 	_dispatch_kq_update(&kev);
2535 }
2536 
2537 static mach_port_t
_dispatch_get_mach_portset(void)2538 _dispatch_get_mach_portset(void)
2539 {
2540 	static dispatch_once_t pred;
2541 	dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init);
2542 	return _dispatch_mach_portset;
2543 }
2544 
2545 static kern_return_t
_dispatch_mach_portset_update(dispatch_kevent_t dk,mach_port_t mps)2546 _dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps)
2547 {
2548 	mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
2549 	kern_return_t kr;
2550 
2551 	_dispatch_debug_machport(mp);
2552 	kr = mach_port_move_member(mach_task_self(), mp, mps);
2553 	if (slowpath(kr)) {
2554 		DISPATCH_VERIFY_MIG(kr);
2555 		switch (kr) {
2556 		case KERN_INVALID_RIGHT:
2557 			if (mps) {
2558 				_dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
2559 						"mach_port_move_member() failed ", kr);
2560 				break;
2561 			}
2562 			//fall through
2563 		case KERN_INVALID_NAME:
2564 #if DISPATCH_DEBUG
2565 			_dispatch_log("Corruption: Mach receive right 0x%x destroyed "
2566 					"prematurely", mp);
2567 #endif
2568 			break;
2569 		default:
2570 			(void)dispatch_assume_zero(kr);
2571 			break;
2572 		}
2573 	}
2574 	return mps ? kr : 0;
2575 }
2576 
2577 static void
_dispatch_kevent_mach_recv_reenable(struct kevent64_s * ke DISPATCH_UNUSED)2578 _dispatch_kevent_mach_recv_reenable(struct kevent64_s *ke DISPATCH_UNUSED)
2579 {
2580 #if (TARGET_IPHONE_SIMULATOR && \
2581 		IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \
2582 		(!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090)
2583 	// delete and re-add kevent to workaround <rdar://problem/13924256>
2584 	if (ke->ext[1] != _dispatch_mach_recv_kevent.ext[1]) {
2585 		struct kevent64_s kev = _dispatch_mach_recv_kevent;
2586 		kev.flags = EV_DELETE;
2587 		_dispatch_kq_update(&kev);
2588 	}
2589 #endif
2590 	_dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent);
2591 }
2592 
2593 static kern_return_t
_dispatch_kevent_machport_resume(dispatch_kevent_t dk,uint32_t new_flags,uint32_t del_flags)2594 _dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
2595 		uint32_t del_flags)
2596 {
2597 	kern_return_t kr = 0;
2598 	dispatch_assert_zero(new_flags & del_flags);
2599 	if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) ||
2600 			(del_flags & _DISPATCH_MACH_RECV_FLAGS)) {
2601 		mach_port_t mps;
2602 		if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2603 			mps = _dispatch_get_mach_recv_portset();
2604 		} else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) ||
2605 				((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) &&
2606 				(dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) {
2607 			mps = _dispatch_get_mach_portset();
2608 		} else {
2609 			mps = MACH_PORT_NULL;
2610 		}
2611 		kr = _dispatch_mach_portset_update(dk, mps);
2612 	}
2613 	return kr;
2614 }
2615 
2616 static kern_return_t
_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk,uint32_t new_flags,uint32_t del_flags)2617 _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags,
2618 		uint32_t del_flags)
2619 {
2620 	kern_return_t kr = 0;
2621 	dispatch_assert_zero(new_flags & del_flags);
2622 	if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
2623 			(del_flags & _DISPATCH_MACH_SP_FLAGS)) {
2624 		// Requesting a (delayed) non-sync send-possible notification
2625 		// registers for both immediate dead-name notification and delayed-arm
2626 		// send-possible notification for the port.
2627 		// The send-possible notification is armed when a mach_msg() with the
2628 		// the MACH_SEND_NOTIFY to the port times out.
2629 		// If send-possible is unavailable, fall back to immediate dead-name
2630 		// registration rdar://problem/2527840&9008724
2631 		kr = _dispatch_mach_notify_update(dk, new_flags, del_flags,
2632 				_DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
2633 				MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
2634 	}
2635 	return kr;
2636 }
2637 
2638 static inline void
_dispatch_kevent_mach_portset(struct kevent64_s * ke)2639 _dispatch_kevent_mach_portset(struct kevent64_s *ke)
2640 {
2641 	if (ke->ident == _dispatch_mach_recv_portset) {
2642 		return _dispatch_kevent_mach_msg_drain(ke);
2643 	} else if (ke->ident == _dispatch_mach_portset) {
2644 		return _dispatch_kevent_machport_drain(ke);
2645 	} else {
2646 		return _dispatch_kevent_error(ke);
2647 	}
2648 }
2649 
2650 DISPATCH_NOINLINE
2651 static void
_dispatch_kevent_machport_drain(struct kevent64_s * ke)2652 _dispatch_kevent_machport_drain(struct kevent64_s *ke)
2653 {
2654 	mach_port_t name = (mach_port_name_t)ke->data;
2655 	dispatch_kevent_t dk;
2656 	struct kevent64_s kev;
2657 
2658 	_dispatch_debug_machport(name);
2659 	dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2660 	if (!dispatch_assume(dk)) {
2661 		return;
2662 	}
2663 	_dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH
2664 
2665 	EV_SET64(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH,
2666 			DISPATCH_MACH_RECV_MESSAGE, 0, (uintptr_t)dk, 0, 0);
2667 	_dispatch_kevent_debug(&kev, __func__);
2668 	_dispatch_kevent_merge(&kev);
2669 }
2670 
2671 DISPATCH_NOINLINE
2672 static void
_dispatch_kevent_mach_msg_drain(struct kevent64_s * ke)2673 _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke)
2674 {
2675 	mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0];
2676 	mach_msg_size_t siz, msgsiz;
2677 	mach_msg_return_t kr = (mach_msg_return_t)ke->fflags;
2678 
2679 	_dispatch_kevent_mach_recv_reenable(ke);
2680 	if (!dispatch_assume(hdr)) {
2681 		DISPATCH_CRASH("EVFILT_MACHPORT with no message");
2682 	}
2683 	if (fastpath(!kr)) {
2684 		return _dispatch_kevent_mach_msg_recv(hdr);
2685 	} else if (kr != MACH_RCV_TOO_LARGE) {
2686 		goto out;
2687 	}
2688 	if (!dispatch_assume(ke->ext[1] <= UINT_MAX -
2689 			dispatch_mach_trailer_size)) {
2690 		DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message");
2691 	}
2692 	siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size;
2693 	hdr = malloc(siz);
2694 	if (ke->data) {
2695 		if (!dispatch_assume(hdr)) {
2696 			// Kernel will discard message too large to fit
2697 			hdr = _dispatch_get_mach_recv_msg_buf();
2698 			siz = _dispatch_mach_recv_msg_buf_size;
2699 		}
2700 		mach_port_t name = (mach_port_name_t)ke->data;
2701 		const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS |
2702 				MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE);
2703 		kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE,
2704 				MACH_PORT_NULL);
2705 		if (fastpath(!kr)) {
2706 			return _dispatch_kevent_mach_msg_recv(hdr);
2707 		} else if (kr == MACH_RCV_TOO_LARGE) {
2708 			_dispatch_log("BUG in libdispatch client: "
2709 					"_dispatch_kevent_mach_msg_drain: dropped message too "
2710 					"large to fit in memory: id = 0x%x, size = %zd",
2711 					hdr->msgh_id, ke->ext[1]);
2712 			kr = MACH_MSG_SUCCESS;
2713 		}
2714 	} else {
2715 		// We don't know which port in the portset contains the large message,
2716 		// so need to receive all messages pending on the portset to ensure the
2717 		// large message is drained. <rdar://problem/13950432>
2718 		bool received = false;
2719 		for (;;) {
2720 			if (!dispatch_assume(hdr)) {
2721 				DISPATCH_CLIENT_CRASH("Message too large to fit in memory");
2722 			}
2723 			const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS |
2724 					MACH_RCV_TIMEOUT);
2725 			kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset,
2726 					MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
2727 			if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume(
2728 					hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) {
2729 				DISPATCH_CRASH("Overlarge message");
2730 			}
2731 			if (fastpath(!kr)) {
2732 				msgsiz = hdr->msgh_size + dispatch_mach_trailer_size;
2733 				if (msgsiz < siz) {
2734 					void *shrink = realloc(hdr, msgsiz);
2735 					if (shrink) hdr = shrink;
2736 				}
2737 				_dispatch_kevent_mach_msg_recv(hdr);
2738 				hdr = NULL;
2739 				received = true;
2740 			} else if (kr == MACH_RCV_TOO_LARGE) {
2741 				siz = hdr->msgh_size + dispatch_mach_trailer_size;
2742 			} else {
2743 				if (kr == MACH_RCV_TIMED_OUT && received) {
2744 					kr = MACH_MSG_SUCCESS;
2745 				}
2746 				break;
2747 			}
2748 			hdr = reallocf(hdr, siz);
2749 		}
2750 	}
2751 	if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2752 		free(hdr);
2753 	}
2754 out:
2755 	if (slowpath(kr)) {
2756 		_dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
2757 				"message reception failed", kr);
2758 	}
2759 }
2760 
2761 static void
_dispatch_kevent_mach_msg_recv(mach_msg_header_t * hdr)2762 _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr)
2763 {
2764 	dispatch_source_refs_t dri;
2765 	dispatch_kevent_t dk;
2766 	mach_port_t name = hdr->msgh_local_port;
2767 	mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size;
2768 
2769 	if (!dispatch_assume(hdr->msgh_size <= UINT_MAX -
2770 			dispatch_mach_trailer_size)) {
2771 		_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2772 				"received overlarge message");
2773 		return _dispatch_kevent_mach_msg_destroy(hdr);
2774 	}
2775 	if (!dispatch_assume(name)) {
2776 		_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2777 				"received message with MACH_PORT_NULL port");
2778 		return _dispatch_kevent_mach_msg_destroy(hdr);
2779 	}
2780 	_dispatch_debug_machport(name);
2781 	dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2782 	if (!dispatch_assume(dk)) {
2783 		_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2784 				"received message with unknown kevent");
2785 		return _dispatch_kevent_mach_msg_destroy(hdr);
2786 	}
2787 	_dispatch_kevent_debug(&dk->dk_kevent, __func__);
2788 	TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
2789 		dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2790 		if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2791 			return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz);
2792 		}
2793 	}
2794 	_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2795 			"received message with no listeners");
2796 	return _dispatch_kevent_mach_msg_destroy(hdr);
2797 }
2798 
2799 static void
_dispatch_kevent_mach_msg_destroy(mach_msg_header_t * hdr)2800 _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr)
2801 {
2802 	if (hdr) {
2803 		mach_msg_destroy(hdr);
2804 		if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2805 			free(hdr);
2806 		}
2807 	}
2808 }
2809 
2810 static void
_dispatch_source_merge_mach_msg(dispatch_source_t ds,dispatch_source_refs_t dr,dispatch_kevent_t dk,mach_msg_header_t * hdr,mach_msg_size_t siz)2811 _dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr,
2812 		dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz)
2813 {
2814 	if (ds == _dispatch_mach_notify_source) {
2815 		_dispatch_mach_notify_source_invoke(hdr);
2816 		return _dispatch_kevent_mach_msg_destroy(hdr);
2817 	}
2818 	dispatch_mach_reply_refs_t dmr = NULL;
2819 	if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) {
2820 		dmr = (dispatch_mach_reply_refs_t)dr;
2821 	}
2822 	return _dispatch_mach_msg_recv((dispatch_mach_t)ds, dmr, hdr, siz);
2823 }
2824 
2825 DISPATCH_ALWAYS_INLINE
2826 static inline void
_dispatch_mach_notify_merge(mach_port_t name,uint32_t flag,bool final)2827 _dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final)
2828 {
2829 	dispatch_source_refs_t dri, dr_next;
2830 	dispatch_kevent_t dk;
2831 	struct kevent64_s kev;
2832 	bool unreg;
2833 
2834 	dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION);
2835 	if (!dk) {
2836 		return;
2837 	}
2838 
2839 	// Update notification registration state.
2840 	dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS;
2841 	EV_SET64(&kev, name, DISPATCH_EVFILT_MACH_NOTIFICATION, EV_ADD|EV_ENABLE,
2842 			flag, 0, (uintptr_t)dk, 0, 0);
2843 	if (final) {
2844 		// This can never happen again
2845 		unreg = true;
2846 	} else {
2847 		// Re-register for notification before delivery
2848 		unreg = _dispatch_kevent_resume(dk, flag, 0);
2849 	}
2850 	DISPATCH_MACH_KEVENT_ARMED(dk) = 0;
2851 	TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
2852 		dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2853 		if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) {
2854 			dispatch_mach_t dm = (dispatch_mach_t)dsi;
2855 			_dispatch_mach_merge_kevent(dm, &kev);
2856 			if (unreg && dm->dm_dkev) {
2857 				_dispatch_mach_kevent_unregister(dm);
2858 			}
2859 		} else {
2860 			_dispatch_source_merge_kevent(dsi, &kev);
2861 			if (unreg) {
2862 				_dispatch_source_kevent_unregister(dsi);
2863 			}
2864 		}
2865 		if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) {
2866 			// current merge is last in list (dk might have been freed)
2867 			// or it re-armed the notification
2868 			return;
2869 		}
2870 	}
2871 }
2872 
2873 static kern_return_t
_dispatch_mach_notify_update(dispatch_kevent_t dk,uint32_t new_flags,uint32_t del_flags,uint32_t mask,mach_msg_id_t notify_msgid,mach_port_mscount_t notify_sync)2874 _dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
2875 		uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
2876 		mach_port_mscount_t notify_sync)
2877 {
2878 	mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
2879 	typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
2880 	kern_return_t kr, krr = 0;
2881 
2882 	// Update notification registration state.
2883 	dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
2884 	dk->dk_kevent.data &= ~(del_flags & mask);
2885 
2886 	_dispatch_debug_machport(port);
2887 	if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
2888 		// initialize _dispatch_mach_notify_port:
2889 		(void)_dispatch_get_mach_recv_portset();
2890 		_dispatch_debug("machport[0x%08x]: registering for send-possible "
2891 				"notification", port);
2892 		previous = MACH_PORT_NULL;
2893 		krr = mach_port_request_notification(mach_task_self(), port,
2894 				notify_msgid, notify_sync, _dispatch_mach_notify_port,
2895 				MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
2896 		DISPATCH_VERIFY_MIG(krr);
2897 
2898 		switch(krr) {
2899 		case KERN_INVALID_NAME:
2900 		case KERN_INVALID_RIGHT:
2901 			// Supress errors & clear registration state
2902 			dk->dk_kevent.data &= ~mask;
2903 			break;
2904 		default:
2905 			// Else, we dont expect any errors from mach. Log any errors
2906 			if (dispatch_assume_zero(krr)) {
2907 				// log the error & clear registration state
2908 				dk->dk_kevent.data &= ~mask;
2909 			} else if (dispatch_assume_zero(previous)) {
2910 				// Another subsystem has beat libdispatch to requesting the
2911 				// specified Mach notification on this port. We should
2912 				// technically cache the previous port and message it when the
2913 				// kernel messages our port. Or we can just say screw those
2914 				// subsystems and deallocate the previous port.
2915 				// They should adopt libdispatch :-P
2916 				kr = mach_port_deallocate(mach_task_self(), previous);
2917 				DISPATCH_VERIFY_MIG(kr);
2918 				(void)dispatch_assume_zero(kr);
2919 				previous = MACH_PORT_NULL;
2920 			}
2921 		}
2922 	} else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
2923 		_dispatch_debug("machport[0x%08x]: unregistering for send-possible "
2924 				"notification", port);
2925 		previous = MACH_PORT_NULL;
2926 		kr = mach_port_request_notification(mach_task_self(), port,
2927 				notify_msgid, notify_sync, MACH_PORT_NULL,
2928 				MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
2929 		DISPATCH_VERIFY_MIG(kr);
2930 
2931 		switch (kr) {
2932 		case KERN_INVALID_NAME:
2933 		case KERN_INVALID_RIGHT:
2934 		case KERN_INVALID_ARGUMENT:
2935 			break;
2936 		default:
2937 			if (dispatch_assume_zero(kr)) {
2938 				// log the error
2939 			}
2940 		}
2941 	} else {
2942 		return 0;
2943 	}
2944 	if (slowpath(previous)) {
2945 		// the kernel has not consumed the send-once right yet
2946 		(void)dispatch_assume_zero(
2947 				_dispatch_send_consume_send_once_right(previous));
2948 	}
2949 	return krr;
2950 }
2951 
2952 static void
_dispatch_mach_host_notify_update(void * context DISPATCH_UNUSED)2953 _dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED)
2954 {
2955 	(void)_dispatch_get_mach_recv_portset();
2956 	_dispatch_debug("registering for calendar-change notification");
2957 	kern_return_t kr = host_request_notification(_dispatch_get_mach_host_port(),
2958 			HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port);
2959 	DISPATCH_VERIFY_MIG(kr);
2960 	(void)dispatch_assume_zero(kr);
2961 }
2962 
2963 static void
_dispatch_mach_host_calendar_change_register(void)2964 _dispatch_mach_host_calendar_change_register(void)
2965 {
2966 	static dispatch_once_t pred;
2967 	dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update);
2968 }
2969 
2970 static void
_dispatch_mach_notify_source_invoke(mach_msg_header_t * hdr)2971 _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr)
2972 {
2973 	mig_reply_error_t reply;
2974 	dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union
2975 		__ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
2976 	dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size);
2977 	boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head);
2978 	if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) {
2979 		// host_notify_reply.defs: host_calendar_changed
2980 		_dispatch_debug("calendar-change notification");
2981 		_dispatch_timers_calendar_change();
2982 		_dispatch_mach_host_notify_update(NULL);
2983 		success = TRUE;
2984 		reply.RetCode = KERN_SUCCESS;
2985 	}
2986 	if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) {
2987 		(void)dispatch_assume_zero(reply.RetCode);
2988 	}
2989 }
2990 
2991 kern_return_t
_dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,mach_port_name_t name)2992 _dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
2993 		mach_port_name_t name)
2994 {
2995 #if DISPATCH_DEBUG
2996 	_dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
2997 			"deleted prematurely", name);
2998 #endif
2999 
3000 	_dispatch_debug_machport(name);
3001 	_dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true);
3002 
3003 	return KERN_SUCCESS;
3004 }
3005 
3006 kern_return_t
_dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,mach_port_name_t name)3007 _dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
3008 		mach_port_name_t name)
3009 {
3010 	kern_return_t kr;
3011 
3012 	_dispatch_debug("machport[0x%08x]: dead-name notification", name);
3013 	_dispatch_debug_machport(name);
3014 	_dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true);
3015 
3016 	// the act of receiving a dead name notification allocates a dead-name
3017 	// right that must be deallocated
3018 	kr = mach_port_deallocate(mach_task_self(), name);
3019 	DISPATCH_VERIFY_MIG(kr);
3020 	//(void)dispatch_assume_zero(kr);
3021 
3022 	return KERN_SUCCESS;
3023 }
3024 
3025 kern_return_t
_dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,mach_port_name_t name)3026 _dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
3027 		mach_port_name_t name)
3028 {
3029 	_dispatch_debug("machport[0x%08x]: send-possible notification", name);
3030 	_dispatch_debug_machport(name);
3031 	_dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false);
3032 
3033 	return KERN_SUCCESS;
3034 }
3035 
3036 #pragma mark -
3037 #pragma mark dispatch_mach_t
3038 
3039 #define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2)
3040 #define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
3041 #define DISPATCH_MACH_OPTIONS_MASK 0xffff
3042 
3043 static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou);
3044 static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm,
3045 		mach_port_t local_port, mach_port_t remote_port);
3046 static dispatch_mach_msg_t _dispatch_mach_msg_create_reply_disconnected(
3047 		dispatch_object_t dou, dispatch_mach_reply_refs_t dmr);
3048 static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm,
3049 		dispatch_object_t dou);
3050 static inline mach_msg_header_t* _dispatch_mach_msg_get_msg(
3051 		dispatch_mach_msg_t dmsg);
3052 static void _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou,
3053 		pthread_priority_t pp);
3054 
3055 static dispatch_mach_t
_dispatch_mach_create(const char * label,dispatch_queue_t q,void * context,dispatch_mach_handler_function_t handler,bool handler_is_block)3056 _dispatch_mach_create(const char *label, dispatch_queue_t q, void *context,
3057 		dispatch_mach_handler_function_t handler, bool handler_is_block)
3058 {
3059 	dispatch_mach_t dm;
3060 	dispatch_mach_refs_t dr;
3061 
3062 	dm = _dispatch_alloc(DISPATCH_VTABLE(mach),
3063 			sizeof(struct dispatch_mach_s));
3064 	_dispatch_queue_init((dispatch_queue_t)dm);
3065 	dm->dq_label = label;
3066 
3067 	dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds
3068 	dm->do_ref_cnt++; // since channel is created suspended
3069 	dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
3070 	dm->do_targetq = &_dispatch_mgr_q;
3071 
3072 	dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s));
3073 	dr->dr_source_wref = _dispatch_ptr2wref(dm);
3074 	dr->dm_handler_func = handler;
3075 	dr->dm_handler_ctxt = context;
3076 	dm->ds_refs = dr;
3077 	dm->dm_handler_is_block = handler_is_block;
3078 
3079 	dm->dm_refs = _dispatch_calloc(1ul,
3080 			sizeof(struct dispatch_mach_send_refs_s));
3081 	dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm);
3082 	dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED;
3083 	TAILQ_INIT(&dm->dm_refs->dm_replies);
3084 
3085 	// First item on the channel sets the user-specified target queue
3086 	dispatch_set_target_queue(dm, q);
3087 	_dispatch_object_debug(dm, "%s", __func__);
3088 	return dm;
3089 }
3090 
3091 dispatch_mach_t
dispatch_mach_create(const char * label,dispatch_queue_t q,dispatch_mach_handler_t handler)3092 dispatch_mach_create(const char *label, dispatch_queue_t q,
3093 		dispatch_mach_handler_t handler)
3094 {
3095 	dispatch_block_t bb = _dispatch_Block_copy((void*)handler);
3096 	return _dispatch_mach_create(label, q, bb,
3097 			(dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true);
3098 }
3099 
3100 dispatch_mach_t
dispatch_mach_create_f(const char * label,dispatch_queue_t q,void * context,dispatch_mach_handler_function_t handler)3101 dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context,
3102 		dispatch_mach_handler_function_t handler)
3103 {
3104 	return _dispatch_mach_create(label, q, context, handler, false);
3105 }
3106 
3107 void
_dispatch_mach_dispose(dispatch_mach_t dm)3108 _dispatch_mach_dispose(dispatch_mach_t dm)
3109 {
3110 	_dispatch_object_debug(dm, "%s", __func__);
3111 	dispatch_mach_refs_t dr = dm->ds_refs;
3112 	if (dm->dm_handler_is_block && dr->dm_handler_ctxt) {
3113 		Block_release(dr->dm_handler_ctxt);
3114 	}
3115 	free(dr);
3116 	free(dm->dm_refs);
3117 	_dispatch_queue_destroy(dm);
3118 }
3119 
3120 void
dispatch_mach_connect(dispatch_mach_t dm,mach_port_t receive,mach_port_t send,dispatch_mach_msg_t checkin)3121 dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive,
3122 		mach_port_t send, dispatch_mach_msg_t checkin)
3123 {
3124 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3125 	dispatch_kevent_t dk;
3126 
3127 	if (MACH_PORT_VALID(receive)) {
3128 		dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3129 		dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3130 		dk->dk_kevent.ident = receive;
3131 		dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3132 		dk->dk_kevent.udata = (uintptr_t)dk;
3133 		TAILQ_INIT(&dk->dk_sources);
3134 		dm->ds_dkev = dk;
3135 		dm->ds_pending_data_mask = dk->dk_kevent.fflags;
3136 		_dispatch_retain(dm); // the reference the manager queue holds
3137 	}
3138 	dr->dm_send = send;
3139 	if (MACH_PORT_VALID(send)) {
3140 		if (checkin) {
3141 			dispatch_retain(checkin);
3142 			mach_msg_option_t options = _dispatch_mach_checkin_options();
3143 			_dispatch_mach_msg_set_options(checkin, options);
3144 			dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3145 		}
3146 		dr->dm_checkin = checkin;
3147 	}
3148 	// monitor message reply ports
3149 	dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3150 	if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt,
3151 			DISPATCH_MACH_NEVER_CONNECTED, 0, release))) {
3152 		DISPATCH_CLIENT_CRASH("Channel already connected");
3153 	}
3154 	_dispatch_object_debug(dm, "%s", __func__);
3155 	return dispatch_resume(dm);
3156 }
3157 
3158 DISPATCH_NOINLINE
3159 static void
_dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,dispatch_mach_reply_refs_t dmr,bool disconnected)3160 _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
3161 		dispatch_mach_reply_refs_t dmr, bool disconnected)
3162 {
3163 	dispatch_mach_msg_t dmsgr = NULL;
3164 	if (disconnected) {
3165 		dmsgr = _dispatch_mach_msg_create_reply_disconnected(NULL, dmr);
3166 	}
3167 	dispatch_kevent_t dk = dmr->dmr_dkev;
3168 	TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list);
3169 	_dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE);
3170 	TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dmr_list);
3171 	if (dmr->dmr_voucher) _voucher_release(dmr->dmr_voucher);
3172 	free(dmr);
3173 	if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3174 }
3175 
3176 DISPATCH_NOINLINE
3177 static void
_dispatch_mach_reply_kevent_register(dispatch_mach_t dm,mach_port_t reply,dispatch_mach_msg_t dmsg)3178 _dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply,
3179 		dispatch_mach_msg_t dmsg)
3180 {
3181 	dispatch_kevent_t dk;
3182 	dispatch_mach_reply_refs_t dmr;
3183 
3184 	dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3185 	dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3186 	dk->dk_kevent.ident = reply;
3187 	dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3188 	dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3189 	dk->dk_kevent.udata = (uintptr_t)dk;
3190 	TAILQ_INIT(&dk->dk_sources);
3191 
3192 	dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s));
3193 	dmr->dr_source_wref = _dispatch_ptr2wref(dm);
3194 	dmr->dmr_dkev = dk;
3195 	if (dmsg->dmsg_voucher) {
3196 		dmr->dmr_voucher =_voucher_retain(dmsg->dmsg_voucher);
3197 	}
3198 	dmr->dmr_priority = dmsg->dmsg_priority;
3199 	// make reply context visible to leaks rdar://11777199
3200 	dmr->dmr_ctxt = dmsg->do_ctxt;
3201 
3202 	_dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply,
3203 			dmsg->do_ctxt);
3204 	uint32_t flags;
3205 	bool do_resume = _dispatch_kevent_register(&dmr->dmr_dkev, &flags);
3206 	TAILQ_INSERT_TAIL(&dmr->dmr_dkev->dk_sources, (dispatch_source_refs_t)dmr,
3207 			dr_list);
3208 	TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dmr_list);
3209 	if (do_resume && _dispatch_kevent_resume(dmr->dmr_dkev, flags, 0)) {
3210 		_dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3211 	}
3212 }
3213 
3214 DISPATCH_NOINLINE
3215 static void
_dispatch_mach_kevent_unregister(dispatch_mach_t dm)3216 _dispatch_mach_kevent_unregister(dispatch_mach_t dm)
3217 {
3218 	dispatch_kevent_t dk = dm->dm_dkev;
3219 	dm->dm_dkev = NULL;
3220 	TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs,
3221 			dr_list);
3222 	dm->ds_pending_data_mask &= ~(unsigned long)
3223 			(DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3224 	_dispatch_kevent_unregister(dk,
3225 			DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3226 }
3227 
3228 DISPATCH_NOINLINE
3229 static void
_dispatch_mach_kevent_register(dispatch_mach_t dm,mach_port_t send)3230 _dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send)
3231 {
3232 	dispatch_kevent_t dk;
3233 
3234 	dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3235 	dk->dk_kevent = _dispatch_source_type_mach_send.ke;
3236 	dk->dk_kevent.ident = send;
3237 	dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3238 	dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD;
3239 	dk->dk_kevent.udata = (uintptr_t)dk;
3240 	TAILQ_INIT(&dk->dk_sources);
3241 
3242 	dm->ds_pending_data_mask |= dk->dk_kevent.fflags;
3243 
3244 	uint32_t flags;
3245 	bool do_resume = _dispatch_kevent_register(&dk, &flags);
3246 	TAILQ_INSERT_TAIL(&dk->dk_sources,
3247 			(dispatch_source_refs_t)dm->dm_refs, dr_list);
3248 	dm->dm_dkev = dk;
3249 	if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) {
3250 		_dispatch_mach_kevent_unregister(dm);
3251 	}
3252 }
3253 
3254 static inline void
_dispatch_mach_push(dispatch_object_t dm,dispatch_object_t dou,pthread_priority_t pp)3255 _dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou,
3256 		pthread_priority_t pp)
3257 {
3258 	return _dispatch_queue_push(dm._dq, dou, pp);
3259 }
3260 
3261 static inline void
_dispatch_mach_msg_set_options(dispatch_object_t dou,mach_msg_option_t options)3262 _dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options)
3263 {
3264 	dou._do->do_suspend_cnt = (unsigned int)options;
3265 }
3266 
3267 static inline mach_msg_option_t
_dispatch_mach_msg_get_options(dispatch_object_t dou)3268 _dispatch_mach_msg_get_options(dispatch_object_t dou)
3269 {
3270 	mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt;
3271 	return options;
3272 }
3273 
3274 static inline void
_dispatch_mach_msg_set_reason(dispatch_object_t dou,mach_error_t err,unsigned long reason)3275 _dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err,
3276 		unsigned long reason)
3277 {
3278 	dispatch_assert_zero(reason & ~(unsigned long)code_emask);
3279 	dou._do->do_suspend_cnt =  (unsigned int)((err || !reason) ? err :
3280 			 err_local|err_sub(0x3e0)|(mach_error_t)reason);
3281 }
3282 
3283 static inline unsigned long
_dispatch_mach_msg_get_reason(dispatch_object_t dou,mach_error_t * err_ptr)3284 _dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr)
3285 {
3286 	mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt;
3287 	dou._do->do_suspend_cnt = 0;
3288 	if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) {
3289 		*err_ptr = 0;
3290 		return err_get_code(err);
3291 	}
3292 	*err_ptr = err;
3293 	return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT;
3294 }
3295 
3296 static void
_dispatch_mach_msg_recv(dispatch_mach_t dm,dispatch_mach_reply_refs_t dmr,mach_msg_header_t * hdr,mach_msg_size_t siz)3297 _dispatch_mach_msg_recv(dispatch_mach_t dm, dispatch_mach_reply_refs_t dmr,
3298 		mach_msg_header_t *hdr, mach_msg_size_t siz)
3299 {
3300 	_dispatch_debug_machport(hdr->msgh_remote_port);
3301 	_dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
3302 			hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port);
3303 	if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3304 		return _dispatch_kevent_mach_msg_destroy(hdr);
3305 	}
3306 	dispatch_mach_msg_t dmsg;
3307 	voucher_t voucher;
3308 	pthread_priority_t priority;
3309 	void *ctxt = NULL;
3310 	if (dmr) {
3311 		_voucher_mach_msg_clear(hdr, false); // deallocate reply message voucher
3312 		voucher = dmr->dmr_voucher;
3313 		dmr->dmr_voucher = NULL; // transfer reference
3314 		priority = dmr->dmr_priority;
3315 		ctxt = dmr->dmr_ctxt;
3316 		_dispatch_mach_reply_kevent_unregister(dm, dmr, false);
3317 	} else {
3318 		voucher = voucher_create_with_mach_msg(hdr);
3319 		priority = _voucher_get_priority(voucher);
3320 	}
3321 	dispatch_mach_msg_destructor_t destructor;
3322 	destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ?
3323 			DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
3324 			DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
3325 	dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
3326 	dmsg->dmsg_voucher = voucher;
3327 	dmsg->dmsg_priority = priority;
3328 	dmsg->do_ctxt = ctxt;
3329 	_dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED);
3330 	_dispatch_voucher_debug("mach-msg[%p] create", voucher, dmsg);
3331 	_dispatch_voucher_ktrace_dmsg_push(dmsg);
3332 	return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3333 }
3334 
3335 static inline mach_port_t
_dispatch_mach_msg_get_remote_port(dispatch_object_t dou)3336 _dispatch_mach_msg_get_remote_port(dispatch_object_t dou)
3337 {
3338 	mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
3339 	mach_port_t remote = hdr->msgh_remote_port;
3340 	return remote;
3341 }
3342 
3343 static inline void
_dispatch_mach_msg_disconnected(dispatch_mach_t dm,mach_port_t local_port,mach_port_t remote_port)3344 _dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port,
3345 		mach_port_t remote_port)
3346 {
3347 	mach_msg_header_t *hdr;
3348 	dispatch_mach_msg_t dmsg;
3349 	dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3350 			DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3351 	if (local_port) hdr->msgh_local_port = local_port;
3352 	if (remote_port) hdr->msgh_remote_port = remote_port;
3353 	_dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED);
3354 	return _dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3355 }
3356 
3357 static inline dispatch_mach_msg_t
_dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou,dispatch_mach_reply_refs_t dmr)3358 _dispatch_mach_msg_create_reply_disconnected(dispatch_object_t dou,
3359 		dispatch_mach_reply_refs_t dmr)
3360 {
3361 	dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
3362 	if (dmsg && !dmsg->dmsg_reply) return NULL;
3363 	mach_msg_header_t *hdr;
3364 	dmsgr = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3365 			DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3366 	if (dmsg) {
3367 		hdr->msgh_local_port = dmsg->dmsg_reply;
3368 		if (dmsg->dmsg_voucher) {
3369 			dmsgr->dmsg_voucher = _voucher_retain(dmsg->dmsg_voucher);
3370 		}
3371 		dmsgr->dmsg_priority = dmsg->dmsg_priority;
3372 		dmsgr->do_ctxt = dmsg->do_ctxt;
3373 	} else {
3374 		hdr->msgh_local_port = (mach_port_t)dmr->dmr_dkev->dk_kevent.ident;
3375 		dmsgr->dmsg_voucher = dmr->dmr_voucher;
3376 		dmr->dmr_voucher = NULL;  // transfer reference
3377 		dmsgr->dmsg_priority = dmr->dmr_priority;
3378 		dmsgr->do_ctxt = dmr->dmr_ctxt;
3379 	}
3380 	_dispatch_mach_msg_set_reason(dmsgr, 0, DISPATCH_MACH_DISCONNECTED);
3381 	return dmsgr;
3382 }
3383 
3384 DISPATCH_NOINLINE
3385 static void
_dispatch_mach_msg_not_sent(dispatch_mach_t dm,dispatch_object_t dou)3386 _dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou)
3387 {
3388 	dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr;
3389 	dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
3390 	_dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_NOT_SENT);
3391 	_dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3392 	if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3393 }
3394 
3395 DISPATCH_NOINLINE
3396 static dispatch_object_t
_dispatch_mach_msg_send(dispatch_mach_t dm,dispatch_object_t dou)3397 _dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou)
3398 {
3399 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3400 	dispatch_mach_msg_t dmsg = dou._dmsg, dmsgr = NULL;
3401 	voucher_t voucher = dmsg->dmsg_voucher;
3402 	mach_voucher_t ipc_kvoucher = MACH_VOUCHER_NULL;
3403 	bool clear_voucher = false, kvoucher_move_send = false;
3404 	dr->dm_needs_mgr = 0;
3405 	if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) {
3406 		// send initial checkin message
3407 		if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() !=
3408 				&_dispatch_mgr_q)) {
3409 			// send kevent must be uninstalled on the manager queue
3410 			dr->dm_needs_mgr = 1;
3411 			goto out;
3412 		}
3413 		dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg;
3414 		if (slowpath(dr->dm_checkin)) {
3415 			goto out;
3416 		}
3417 	}
3418 	mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3419 	mach_msg_return_t kr = 0;
3420 	mach_port_t reply = dmsg->dmsg_reply;
3421 	mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg);
3422 	if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) {
3423 		opts = MACH_SEND_MSG | (msg_opts & ~DISPATCH_MACH_OPTIONS_MASK);
3424 		if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) !=
3425 				MACH_MSG_TYPE_MOVE_SEND_ONCE) {
3426 			if (dmsg != dr->dm_checkin) {
3427 				msg->msgh_remote_port = dr->dm_send;
3428 			}
3429 			if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
3430 				if (slowpath(!dm->dm_dkev)) {
3431 					_dispatch_mach_kevent_register(dm, msg->msgh_remote_port);
3432 				}
3433 				if (fastpath(dm->dm_dkev)) {
3434 					if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) {
3435 						goto out;
3436 					}
3437 					opts |= MACH_SEND_NOTIFY;
3438 				}
3439 			}
3440 			opts |= MACH_SEND_TIMEOUT;
3441 			if (dmsg->dmsg_priority != _voucher_get_priority(voucher)) {
3442 				ipc_kvoucher = _voucher_create_mach_voucher_with_priority(
3443 						voucher, dmsg->dmsg_priority);
3444 			}
3445 			_dispatch_voucher_debug("mach-msg[%p] msg_set", voucher, dmsg);
3446 			if (ipc_kvoucher) {
3447 				kvoucher_move_send = true;
3448 				clear_voucher = _voucher_mach_msg_set_mach_voucher(msg,
3449 						ipc_kvoucher, kvoucher_move_send);
3450 			} else {
3451 				clear_voucher = _voucher_mach_msg_set(msg, voucher);
3452 			}
3453 		}
3454 		_voucher_activity_trace_msg(voucher, msg, send);
3455 		_dispatch_debug_machport(msg->msgh_remote_port);
3456 		if (reply) _dispatch_debug_machport(reply);
3457 		kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0,
3458 				MACH_PORT_NULL);
3459 		_dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, "
3460 				"opts 0x%x, msg_opts 0x%x, kvoucher 0x%08x, reply on 0x%08x: "
3461 				"%s - 0x%x", msg->msgh_remote_port, msg->msgh_id, dmsg->do_ctxt,
3462 				opts, msg_opts, msg->msgh_voucher_port, reply,
3463 				mach_error_string(kr), kr);
3464 		if (clear_voucher) {
3465 			if (kr == MACH_SEND_INVALID_VOUCHER && msg->msgh_voucher_port) {
3466 				DISPATCH_CRASH("Voucher port corruption");
3467 			}
3468 			mach_voucher_t kv;
3469 			kv = _voucher_mach_msg_clear(msg, kvoucher_move_send);
3470 			if (kvoucher_move_send) ipc_kvoucher = kv;
3471 		}
3472 	}
3473 	if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) {
3474 		if (opts & MACH_SEND_NOTIFY) {
3475 			_dispatch_debug("machport[0x%08x]: send-possible notification "
3476 					"armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident);
3477 			DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1;
3478 		} else {
3479 			// send kevent must be installed on the manager queue
3480 			dr->dm_needs_mgr = 1;
3481 		}
3482 		if (ipc_kvoucher) {
3483 			_dispatch_kvoucher_debug("reuse on re-send", ipc_kvoucher);
3484 			voucher_t ipc_voucher;
3485 			ipc_voucher = _voucher_create_with_priority_and_mach_voucher(
3486 					voucher, dmsg->dmsg_priority, ipc_kvoucher);
3487 			_dispatch_voucher_debug("mach-msg[%p] replace voucher[%p]",
3488 					ipc_voucher, dmsg, voucher);
3489 			if (dmsg->dmsg_voucher) _voucher_release(dmsg->dmsg_voucher);
3490 			dmsg->dmsg_voucher = ipc_voucher;
3491 		}
3492 		goto out;
3493 	} else if (ipc_kvoucher && (kr || !kvoucher_move_send)) {
3494 		_voucher_dealloc_mach_voucher(ipc_kvoucher);
3495 	}
3496 	if (fastpath(!kr) && reply &&
3497 			!(dm->ds_dkev && dm->ds_dkev->dk_kevent.ident == reply)) {
3498 		if (_dispatch_queue_get_current() != &_dispatch_mgr_q) {
3499 			// reply receive kevent must be installed on the manager queue
3500 			dr->dm_needs_mgr = 1;
3501 			_dispatch_mach_msg_set_options(dmsg, msg_opts |
3502 					DISPATCH_MACH_REGISTER_FOR_REPLY);
3503 			goto out;
3504 		}
3505 		_dispatch_mach_reply_kevent_register(dm, reply, dmsg);
3506 	}
3507 	if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) {
3508 		_dispatch_mach_kevent_unregister(dm);
3509 	}
3510 	if (slowpath(kr)) {
3511 		// Send failed, so reply was never connected <rdar://problem/14309159>
3512 		dmsgr = _dispatch_mach_msg_create_reply_disconnected(dmsg, NULL);
3513 	}
3514 	_dispatch_mach_msg_set_reason(dmsg, kr, 0);
3515 	_dispatch_mach_push(dm, dmsg, dmsg->dmsg_priority);
3516 	if (dmsgr) _dispatch_mach_push(dm, dmsgr, dmsgr->dmsg_priority);
3517 	dmsg = NULL;
3518 out:
3519 	return (dispatch_object_t)dmsg;
3520 }
3521 
3522 DISPATCH_ALWAYS_INLINE
3523 static inline void
_dispatch_mach_send_push_wakeup(dispatch_mach_t dm,dispatch_object_t dou,bool wakeup)3524 _dispatch_mach_send_push_wakeup(dispatch_mach_t dm, dispatch_object_t dou,
3525 		bool wakeup)
3526 {
3527 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3528 	struct dispatch_object_s *prev, *dc = dou._do;
3529 	dc->do_next = NULL;
3530 
3531 	prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release);
3532 	if (fastpath(prev)) {
3533 		prev->do_next = dc;
3534 	} else {
3535 		dr->dm_head = dc;
3536 	}
3537 	if (wakeup || !prev) {
3538 		_dispatch_wakeup(dm);
3539 	}
3540 }
3541 
3542 DISPATCH_ALWAYS_INLINE
3543 static inline void
_dispatch_mach_send_push(dispatch_mach_t dm,dispatch_object_t dou)3544 _dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou)
3545 {
3546 	return _dispatch_mach_send_push_wakeup(dm, dou, false);
3547 }
3548 
3549 DISPATCH_NOINLINE
3550 static void
_dispatch_mach_send_drain(dispatch_mach_t dm)3551 _dispatch_mach_send_drain(dispatch_mach_t dm)
3552 {
3553 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3554 	struct dispatch_object_s *dc = NULL, *next_dc = NULL;
3555 	while (dr->dm_tail) {
3556 		_dispatch_wait_until(dc = fastpath(dr->dm_head));
3557 		do {
3558 			next_dc = fastpath(dc->do_next);
3559 			dr->dm_head = next_dc;
3560 			if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL,
3561 					relaxed)) {
3562 				_dispatch_wait_until(next_dc = fastpath(dc->do_next));
3563 				dr->dm_head = next_dc;
3564 			}
3565 			if (!DISPATCH_OBJ_IS_VTABLE(dc)) {
3566 				if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
3567 					// send barrier
3568 					// leave send queue locked until barrier has completed
3569 					return _dispatch_mach_push(dm, dc,
3570 							((dispatch_continuation_t)dc)->dc_priority);
3571 				}
3572 #if DISPATCH_MACH_SEND_SYNC
3573 				if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){
3574 					_dispatch_thread_semaphore_signal(
3575 							(_dispatch_thread_semaphore_t)dc->do_ctxt);
3576 					continue;
3577 				}
3578 #endif // DISPATCH_MACH_SEND_SYNC
3579 				if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) {
3580 					goto out;
3581 				}
3582 				continue;
3583 			}
3584 			_dispatch_voucher_ktrace_dmsg_pop((dispatch_mach_msg_t)dc);
3585 			if (slowpath(dr->dm_disconnect_cnt) ||
3586 					slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3587 				_dispatch_mach_msg_not_sent(dm, dc);
3588 				continue;
3589 			}
3590 			if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) {
3591 				goto out;
3592 			}
3593 		} while ((dc = next_dc));
3594 	}
3595 out:
3596 	// if this is not a complete drain, we must undo some things
3597 	if (slowpath(dc)) {
3598 		if (!next_dc &&
3599 				!dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) {
3600 			// wait for enqueue slow path to finish
3601 			_dispatch_wait_until(next_dc = fastpath(dr->dm_head));
3602 			dc->do_next = next_dc;
3603 		}
3604 		dr->dm_head = dc;
3605 	}
3606 	(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3607 	_dispatch_wakeup(dm);
3608 }
3609 
3610 static inline void
_dispatch_mach_send(dispatch_mach_t dm)3611 _dispatch_mach_send(dispatch_mach_t dm)
3612 {
3613 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3614 	if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr,
3615 			dm_sending, 0, 1, acquire))) {
3616 		return;
3617 	}
3618 	_dispatch_object_debug(dm, "%s", __func__);
3619 	_dispatch_mach_send_drain(dm);
3620 }
3621 
3622 DISPATCH_NOINLINE
3623 static void
_dispatch_mach_merge_kevent(dispatch_mach_t dm,const struct kevent64_s * ke)3624 _dispatch_mach_merge_kevent(dispatch_mach_t dm, const struct kevent64_s *ke)
3625 {
3626 	if (!(ke->fflags & dm->ds_pending_data_mask)) {
3627 		return;
3628 	}
3629 	_dispatch_mach_send(dm);
3630 }
3631 
3632 static inline mach_msg_option_t
_dispatch_mach_checkin_options(void)3633 _dispatch_mach_checkin_options(void)
3634 {
3635 	mach_msg_option_t options = 0;
3636 #if DISPATCH_USE_CHECKIN_NOIMPORTANCE
3637 	options = MACH_SEND_NOIMPORTANCE; // <rdar://problem/16996737>
3638 #endif
3639 	return options;
3640 }
3641 
3642 
3643 static inline mach_msg_option_t
_dispatch_mach_send_options(void)3644 _dispatch_mach_send_options(void)
3645 {
3646 	mach_msg_option_t options = 0;
3647 	return options;
3648 }
3649 
3650 DISPATCH_NOINLINE
3651 void
dispatch_mach_send(dispatch_mach_t dm,dispatch_mach_msg_t dmsg,mach_msg_option_t options)3652 dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
3653 		mach_msg_option_t options)
3654 {
3655 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3656 	if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) {
3657 		DISPATCH_CLIENT_CRASH("Message already enqueued");
3658 	}
3659 	dispatch_retain(dmsg);
3660 	dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
3661 	options |= _dispatch_mach_send_options();
3662 	_dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK);
3663 	mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3664 	dmsg->dmsg_reply = (MACH_MSGH_BITS_LOCAL(msg->msgh_bits) ==
3665 			MACH_MSG_TYPE_MAKE_SEND_ONCE &&
3666 			MACH_PORT_VALID(msg->msgh_local_port) ? msg->msgh_local_port :
3667 			MACH_PORT_NULL);
3668 	bool is_reply = (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) ==
3669 			MACH_MSG_TYPE_MOVE_SEND_ONCE);
3670 	dmsg->dmsg_priority = _dispatch_priority_propagate();
3671 	dmsg->dmsg_voucher = _voucher_copy();
3672 	_dispatch_voucher_debug("mach-msg[%p] set", dmsg->dmsg_voucher, dmsg);
3673 	if ((!is_reply && slowpath(dr->dm_tail)) ||
3674 			slowpath(dr->dm_disconnect_cnt) ||
3675 			slowpath(dm->ds_atomic_flags & DSF_CANCELED) ||
3676 			slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1,
3677 					acquire))) {
3678 		_dispatch_voucher_ktrace_dmsg_push(dmsg);
3679 		return _dispatch_mach_send_push(dm, dmsg);
3680 	}
3681 	if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) {
3682 		(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3683 		_dispatch_voucher_ktrace_dmsg_push(dmsg);
3684 		return _dispatch_mach_send_push_wakeup(dm, dmsg, true);
3685 	}
3686 	if (!is_reply && slowpath(dr->dm_tail)) {
3687 		return _dispatch_mach_send_drain(dm);
3688 	}
3689 	(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3690 	_dispatch_wakeup(dm);
3691 }
3692 
3693 static void
_dispatch_mach_disconnect(dispatch_mach_t dm)3694 _dispatch_mach_disconnect(dispatch_mach_t dm)
3695 {
3696 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3697 	if (dm->dm_dkev) {
3698 		_dispatch_mach_kevent_unregister(dm);
3699 	}
3700 	if (MACH_PORT_VALID(dr->dm_send)) {
3701 		_dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send);
3702 	}
3703 	dr->dm_send = MACH_PORT_NULL;
3704 	if (dr->dm_checkin) {
3705 		_dispatch_mach_msg_not_sent(dm, dr->dm_checkin);
3706 		dr->dm_checkin = NULL;
3707 	}
3708 	if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3709 		dispatch_mach_reply_refs_t dmr, tmp;
3710 		TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dmr_list, tmp){
3711 			_dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3712 		}
3713 	}
3714 }
3715 
3716 DISPATCH_NOINLINE
3717 static bool
_dispatch_mach_cancel(dispatch_mach_t dm)3718 _dispatch_mach_cancel(dispatch_mach_t dm)
3719 {
3720 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3721 	if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) {
3722 		return false;
3723 	}
3724 	_dispatch_object_debug(dm, "%s", __func__);
3725 	_dispatch_mach_disconnect(dm);
3726 	if (dm->ds_dkev) {
3727 		mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident;
3728 		_dispatch_source_kevent_unregister((dispatch_source_t)dm);
3729 		_dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
3730 	}
3731 	(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3732 	return true;
3733 }
3734 
3735 DISPATCH_NOINLINE
3736 static bool
_dispatch_mach_reconnect_invoke(dispatch_mach_t dm,dispatch_object_t dou)3737 _dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou)
3738 {
3739 	if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3740 		if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) {
3741 			// send/reply kevents must be uninstalled on the manager queue
3742 			return false;
3743 		}
3744 	}
3745 	_dispatch_mach_disconnect(dm);
3746 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3747 	dr->dm_checkin = dou._dc->dc_data;
3748 	dr->dm_send = (mach_port_t)dou._dc->dc_other;
3749 	_dispatch_continuation_free(dou._dc);
3750 	(void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed);
3751 	_dispatch_object_debug(dm, "%s", __func__);
3752 	return true;
3753 }
3754 
3755 DISPATCH_NOINLINE
3756 void
dispatch_mach_reconnect(dispatch_mach_t dm,mach_port_t send,dispatch_mach_msg_t checkin)3757 dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send,
3758 		dispatch_mach_msg_t checkin)
3759 {
3760 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3761 	(void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed);
3762 	if (MACH_PORT_VALID(send) && checkin) {
3763 		dispatch_retain(checkin);
3764 		mach_msg_option_t options = _dispatch_mach_checkin_options();
3765 		_dispatch_mach_msg_set_options(checkin, options);
3766 		dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3767 	} else {
3768 		checkin = NULL;
3769 		dr->dm_checkin_port = MACH_PORT_NULL;
3770 	}
3771 	dispatch_continuation_t dc = _dispatch_continuation_alloc();
3772 	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3773 	dc->dc_func = (void*)_dispatch_mach_reconnect_invoke;
3774 	dc->dc_ctxt = dc;
3775 	dc->dc_data = checkin;
3776 	dc->dc_other = (void*)(uintptr_t)send;
3777 	return _dispatch_mach_send_push(dm, dc);
3778 }
3779 
3780 #if DISPATCH_MACH_SEND_SYNC
3781 DISPATCH_NOINLINE
3782 static void
_dispatch_mach_send_sync_slow(dispatch_mach_t dm)3783 _dispatch_mach_send_sync_slow(dispatch_mach_t dm)
3784 {
3785 	_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
3786 	struct dispatch_object_s dc = {
3787 		.do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT),
3788 		.do_ctxt = (void*)sema,
3789 	};
3790 	_dispatch_mach_send_push(dm, &dc);
3791 	_dispatch_thread_semaphore_wait(sema);
3792 	_dispatch_put_thread_semaphore(sema);
3793 }
3794 #endif // DISPATCH_MACH_SEND_SYNC
3795 
3796 DISPATCH_NOINLINE
3797 mach_port_t
dispatch_mach_get_checkin_port(dispatch_mach_t dm)3798 dispatch_mach_get_checkin_port(dispatch_mach_t dm)
3799 {
3800 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3801 	if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3802 		return MACH_PORT_DEAD;
3803 	}
3804 	return dr->dm_checkin_port;
3805 }
3806 
3807 DISPATCH_NOINLINE
3808 static void
_dispatch_mach_connect_invoke(dispatch_mach_t dm)3809 _dispatch_mach_connect_invoke(dispatch_mach_t dm)
3810 {
3811 	dispatch_mach_refs_t dr = dm->ds_refs;
3812 	_dispatch_client_callout4(dr->dm_handler_ctxt,
3813 			DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func);
3814 	dm->dm_connect_handler_called = 1;
3815 }
3816 
3817 DISPATCH_NOINLINE
3818 void
_dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg)3819 _dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg)
3820 {
3821 	dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3822 	dispatch_mach_refs_t dr = dm->ds_refs;
3823 	mach_error_t err;
3824 	unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err);
3825 
3826 	dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
3827 	_dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3828 	_dispatch_voucher_ktrace_dmsg_pop(dmsg);
3829 	_dispatch_voucher_debug("mach-msg[%p] adopt", dmsg->dmsg_voucher, dmsg);
3830 	_dispatch_adopt_priority_and_replace_voucher(dmsg->dmsg_priority,
3831 			dmsg->dmsg_voucher, DISPATCH_PRIORITY_ENFORCE);
3832 	dmsg->dmsg_voucher = NULL;
3833 	if (slowpath(!dm->dm_connect_handler_called)) {
3834 		_dispatch_mach_connect_invoke(dm);
3835 	}
3836 	_dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err,
3837 			dr->dm_handler_func);
3838 	_dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3839 	_dispatch_introspection_queue_item_complete(dmsg);
3840 	dispatch_release(dmsg);
3841 }
3842 
3843 DISPATCH_NOINLINE
3844 void
_dispatch_mach_barrier_invoke(void * ctxt)3845 _dispatch_mach_barrier_invoke(void *ctxt)
3846 {
3847 	dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3848 	dispatch_mach_refs_t dr = dm->ds_refs;
3849 	struct dispatch_continuation_s *dc = ctxt;
3850 	void *context = dc->dc_data;
3851 	dispatch_function_t barrier = dc->dc_other;
3852 	bool send_barrier = ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT);
3853 
3854 	_dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3855 	if (slowpath(!dm->dm_connect_handler_called)) {
3856 		_dispatch_mach_connect_invoke(dm);
3857 	}
3858 	_dispatch_client_callout(context, barrier);
3859 	_dispatch_client_callout4(dr->dm_handler_ctxt,
3860 			DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0, dr->dm_handler_func);
3861 	_dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3862 	if (send_barrier) {
3863 		(void)dispatch_atomic_dec2o(dm->dm_refs, dm_sending, release);
3864 	}
3865 }
3866 
3867 DISPATCH_NOINLINE
3868 void
dispatch_mach_send_barrier_f(dispatch_mach_t dm,void * context,dispatch_function_t barrier)3869 dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context,
3870 		dispatch_function_t barrier)
3871 {
3872 	dispatch_continuation_t dc = _dispatch_continuation_alloc();
3873 	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
3874 	dc->dc_func = _dispatch_mach_barrier_invoke;
3875 	dc->dc_ctxt = dc;
3876 	dc->dc_data = context;
3877 	dc->dc_other = barrier;
3878 	_dispatch_continuation_voucher_set(dc, 0);
3879 	_dispatch_continuation_priority_set(dc, 0, 0);
3880 
3881 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3882 	if (slowpath(dr->dm_tail) || slowpath(!dispatch_atomic_cmpxchg2o(dr,
3883 			dm_sending, 0, 1, acquire))) {
3884 		return _dispatch_mach_send_push(dm, dc);
3885 	}
3886 	// leave send queue locked until barrier has completed
3887 	return _dispatch_mach_push(dm, dc, dc->dc_priority);
3888 }
3889 
3890 DISPATCH_NOINLINE
3891 void
dispatch_mach_receive_barrier_f(dispatch_mach_t dm,void * context,dispatch_function_t barrier)3892 dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context,
3893 		dispatch_function_t barrier)
3894 {
3895 	dispatch_continuation_t dc = _dispatch_continuation_alloc();
3896 	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3897 	dc->dc_func = _dispatch_mach_barrier_invoke;
3898 	dc->dc_ctxt = dc;
3899 	dc->dc_data = context;
3900 	dc->dc_other = barrier;
3901 	_dispatch_continuation_voucher_set(dc, 0);
3902 	_dispatch_continuation_priority_set(dc, 0, 0);
3903 
3904 	return _dispatch_mach_push(dm, dc, dc->dc_priority);
3905 }
3906 
3907 DISPATCH_NOINLINE
3908 void
dispatch_mach_send_barrier(dispatch_mach_t dm,dispatch_block_t barrier)3909 dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3910 {
3911 	dispatch_mach_send_barrier_f(dm, _dispatch_Block_copy(barrier),
3912 			_dispatch_call_block_and_release);
3913 }
3914 
3915 DISPATCH_NOINLINE
3916 void
dispatch_mach_receive_barrier(dispatch_mach_t dm,dispatch_block_t barrier)3917 dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3918 {
3919 	dispatch_mach_receive_barrier_f(dm, _dispatch_Block_copy(barrier),
3920 			_dispatch_call_block_and_release);
3921 }
3922 
3923 DISPATCH_NOINLINE
3924 static void
_dispatch_mach_cancel_invoke(dispatch_mach_t dm)3925 _dispatch_mach_cancel_invoke(dispatch_mach_t dm)
3926 {
3927 	dispatch_mach_refs_t dr = dm->ds_refs;
3928 	if (slowpath(!dm->dm_connect_handler_called)) {
3929 		_dispatch_mach_connect_invoke(dm);
3930 	}
3931 	_dispatch_client_callout4(dr->dm_handler_ctxt,
3932 			DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func);
3933 	dm->dm_cancel_handler_called = 1;
3934 	_dispatch_release(dm); // the retain is done at creation time
3935 }
3936 
3937 DISPATCH_NOINLINE
3938 void
dispatch_mach_cancel(dispatch_mach_t dm)3939 dispatch_mach_cancel(dispatch_mach_t dm)
3940 {
3941 	dispatch_source_cancel((dispatch_source_t)dm);
3942 }
3943 
3944 DISPATCH_ALWAYS_INLINE
3945 static inline dispatch_queue_t
_dispatch_mach_invoke2(dispatch_object_t dou,_dispatch_thread_semaphore_t * sema_ptr DISPATCH_UNUSED)3946 _dispatch_mach_invoke2(dispatch_object_t dou,
3947 		_dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
3948 {
3949 	dispatch_mach_t dm = dou._dm;
3950 
3951 	// This function performs all mach channel actions. Each action is
3952 	// responsible for verifying that it takes place on the appropriate queue.
3953 	// If the current queue is not the correct queue for this action, the
3954 	// correct queue will be returned and the invoke will be re-driven on that
3955 	// queue.
3956 
3957 	// The order of tests here in invoke and in probe should be consistent.
3958 
3959 	dispatch_queue_t dq = _dispatch_queue_get_current();
3960 	dispatch_mach_send_refs_t dr = dm->dm_refs;
3961 
3962 	if (slowpath(!dm->ds_is_installed)) {
3963 		// The channel needs to be installed on the manager queue.
3964 		if (dq != &_dispatch_mgr_q) {
3965 			return &_dispatch_mgr_q;
3966 		}
3967 		if (dm->ds_dkev) {
3968 			_dispatch_source_kevent_register((dispatch_source_t)dm);
3969 		}
3970 		dm->ds_is_installed = true;
3971 		_dispatch_mach_send(dm);
3972 		// Apply initial target queue change
3973 		_dispatch_queue_drain(dou);
3974 		if (dm->dq_items_tail) {
3975 			return dm->do_targetq;
3976 		}
3977 	} else if (dm->dq_items_tail) {
3978 		// The channel has pending messages to deliver to the target queue.
3979 		if (dq != dm->do_targetq) {
3980 			return dm->do_targetq;
3981 		}
3982 		dispatch_queue_t tq = dm->do_targetq;
3983 		if (slowpath(_dispatch_queue_drain(dou))) {
3984 			DISPATCH_CLIENT_CRASH("Sync onto mach channel");
3985 		}
3986 		if (slowpath(tq != dm->do_targetq)) {
3987 			// An item on the channel changed the target queue
3988 			return dm->do_targetq;
3989 		}
3990 	} else if (dr->dm_sending) {
3991 		// Sending and uninstallation below require the send lock, the channel
3992 		// will be woken up when the lock is dropped <rdar://15132939&15203957>
3993 		return NULL;
3994 	} else if (dr->dm_tail) {
3995 		if (slowpath(dr->dm_needs_mgr) || (slowpath(dr->dm_disconnect_cnt) &&
3996 				(dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)))) {
3997 			// Send/reply kevents need to be installed or uninstalled
3998 			if (dq != &_dispatch_mgr_q) {
3999 				return &_dispatch_mgr_q;
4000 			}
4001 		}
4002 		if (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
4003 				(dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt) {
4004 			// The channel has pending messages to send.
4005 			_dispatch_mach_send(dm);
4006 		}
4007 	} else if (dm->ds_atomic_flags & DSF_CANCELED){
4008 		// The channel has been cancelled and needs to be uninstalled from the
4009 		// manager queue. After uninstallation, the cancellation handler needs
4010 		// to be delivered to the target queue.
4011 		if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
4012 				!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
4013 			if (dq != &_dispatch_mgr_q) {
4014 				return &_dispatch_mgr_q;
4015 			}
4016 			if (!_dispatch_mach_cancel(dm)) {
4017 				return NULL;
4018 			}
4019 		}
4020 		if (!dm->dm_cancel_handler_called) {
4021 			if (dq != dm->do_targetq) {
4022 				return dm->do_targetq;
4023 			}
4024 			_dispatch_mach_cancel_invoke(dm);
4025 		}
4026 	}
4027 	return NULL;
4028 }
4029 
4030 DISPATCH_NOINLINE
4031 void
_dispatch_mach_invoke(dispatch_mach_t dm)4032 _dispatch_mach_invoke(dispatch_mach_t dm)
4033 {
4034 	_dispatch_queue_class_invoke(dm, _dispatch_mach_invoke2);
4035 }
4036 
4037 unsigned long
_dispatch_mach_probe(dispatch_mach_t dm)4038 _dispatch_mach_probe(dispatch_mach_t dm)
4039 {
4040 	// This function determines whether the mach channel needs to be invoked.
4041 	// The order of tests here in probe and in invoke should be consistent.
4042 
4043 	dispatch_mach_send_refs_t dr = dm->dm_refs;
4044 
4045 	if (slowpath(!dm->ds_is_installed)) {
4046 		// The channel needs to be installed on the manager queue.
4047 		return true;
4048 	} else if (_dispatch_queue_class_probe(dm)) {
4049 		// The source has pending messages to deliver to the target queue.
4050 		return true;
4051 	} else if (dr->dm_sending) {
4052 		// Sending and uninstallation below require the send lock, the channel
4053 		// will be woken up when the lock is dropped <rdar://15132939&15203957>
4054 		return false;
4055 	} else if (dr->dm_tail &&
4056 			(!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
4057 			(dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt)) {
4058 		// The channel has pending messages to send.
4059 		return true;
4060 	} else if (dm->ds_atomic_flags & DSF_CANCELED) {
4061 		if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
4062 				!TAILQ_EMPTY(&dm->dm_refs->dm_replies) ||
4063 				!dm->dm_cancel_handler_called) {
4064 			// The channel needs to be uninstalled from the manager queue, or
4065 			// the cancellation handler needs to be delivered to the target
4066 			// queue.
4067 			return true;
4068 		}
4069 	}
4070 	// Nothing to do.
4071 	return false;
4072 }
4073 
4074 #pragma mark -
4075 #pragma mark dispatch_mach_msg_t
4076 
4077 dispatch_mach_msg_t
dispatch_mach_msg_create(mach_msg_header_t * msg,size_t size,dispatch_mach_msg_destructor_t destructor,mach_msg_header_t ** msg_ptr)4078 dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size,
4079 		dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr)
4080 {
4081 	if (slowpath(size < sizeof(mach_msg_header_t)) ||
4082 			slowpath(destructor && !msg)) {
4083 		DISPATCH_CLIENT_CRASH("Empty message");
4084 	}
4085 	dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg),
4086 			sizeof(struct dispatch_mach_msg_s) +
4087 			(destructor ? 0 : size - sizeof(dmsg->dmsg_msg)));
4088 	if (destructor) {
4089 		dmsg->dmsg_msg = msg;
4090 	} else if (msg) {
4091 		memcpy(dmsg->dmsg_buf, msg, size);
4092 	}
4093 	dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
4094 	dmsg->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
4095 			false);
4096 	dmsg->dmsg_destructor = destructor;
4097 	dmsg->dmsg_size = size;
4098 	if (msg_ptr) {
4099 		*msg_ptr = _dispatch_mach_msg_get_msg(dmsg);
4100 	}
4101 	return dmsg;
4102 }
4103 
4104 void
_dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg)4105 _dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg)
4106 {
4107 	if (dmsg->dmsg_voucher) {
4108 		_voucher_release(dmsg->dmsg_voucher);
4109 		dmsg->dmsg_voucher = NULL;
4110 	}
4111 	switch (dmsg->dmsg_destructor) {
4112 	case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT:
4113 		break;
4114 	case DISPATCH_MACH_MSG_DESTRUCTOR_FREE:
4115 		free(dmsg->dmsg_msg);
4116 		break;
4117 	case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: {
4118 		mach_vm_size_t vm_size = dmsg->dmsg_size;
4119 		mach_vm_address_t vm_addr = (uintptr_t)dmsg->dmsg_msg;
4120 		(void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
4121 				vm_addr, vm_size));
4122 		break;
4123 	}}
4124 }
4125 
4126 static inline mach_msg_header_t*
_dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg)4127 _dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg)
4128 {
4129 	return dmsg->dmsg_destructor ? dmsg->dmsg_msg :
4130 		(mach_msg_header_t*)(uintptr_t)dmsg->dmsg_buf;
4131 }
4132 
4133 mach_msg_header_t*
dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg,size_t * size_ptr)4134 dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr)
4135 {
4136 	if (size_ptr) {
4137 		*size_ptr = dmsg->dmsg_size;
4138 	}
4139 	return _dispatch_mach_msg_get_msg(dmsg);
4140 }
4141 
4142 size_t
_dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg,char * buf,size_t bufsiz)4143 _dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz)
4144 {
4145 	size_t offset = 0;
4146 	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4147 			dx_kind(dmsg), dmsg);
4148 	offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, "
4149 			"refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1);
4150 	offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, "
4151 			"msgh[%p] = { ", dmsg->do_suspend_cnt, dmsg->dmsg_buf);
4152 	mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg);
4153 	if (hdr->msgh_id) {
4154 		offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ",
4155 				hdr->msgh_id);
4156 	}
4157 	if (hdr->msgh_size) {
4158 		offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ",
4159 				hdr->msgh_size);
4160 	}
4161 	if (hdr->msgh_bits) {
4162 		offset += dsnprintf(&buf[offset], bufsiz - offset, "bits <l %u, r %u",
4163 				MACH_MSGH_BITS_LOCAL(hdr->msgh_bits),
4164 				MACH_MSGH_BITS_REMOTE(hdr->msgh_bits));
4165 		if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) {
4166 			offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x",
4167 					MACH_MSGH_BITS_OTHER(hdr->msgh_bits));
4168 		}
4169 		offset += dsnprintf(&buf[offset], bufsiz - offset, ">, ");
4170 	}
4171 	if (hdr->msgh_local_port && hdr->msgh_remote_port) {
4172 		offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, "
4173 				"remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port);
4174 	} else if (hdr->msgh_local_port) {
4175 		offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x",
4176 				hdr->msgh_local_port);
4177 	} else if (hdr->msgh_remote_port) {
4178 		offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x",
4179 				hdr->msgh_remote_port);
4180 	} else {
4181 		offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports");
4182 	}
4183 	offset += dsnprintf(&buf[offset], bufsiz - offset, " } }");
4184 	return offset;
4185 }
4186 
4187 #pragma mark -
4188 #pragma mark dispatch_mig_server
4189 
4190 mach_msg_return_t
dispatch_mig_server(dispatch_source_t ds,size_t maxmsgsz,dispatch_mig_callback_t callback)4191 dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
4192 		dispatch_mig_callback_t callback)
4193 {
4194 	mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
4195 		| MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
4196 		| MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0) | MACH_RCV_VOUCHER;
4197 	mach_msg_options_t tmp_options;
4198 	mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
4199 	mach_msg_return_t kr = 0;
4200 	uint64_t assertion_token = 0;
4201 	unsigned int cnt = 1000; // do not stall out serial queues
4202 	boolean_t demux_success;
4203 	bool received = false;
4204 	size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
4205 
4206 	// XXX FIXME -- allocate these elsewhere
4207 	bufRequest = alloca(rcv_size);
4208 	bufReply = alloca(rcv_size);
4209 	bufReply->Head.msgh_size = 0;
4210 	bufRequest->RetCode = 0;
4211 
4212 #if DISPATCH_DEBUG
4213 	options |= MACH_RCV_LARGE; // rdar://problem/8422992
4214 #endif
4215 	tmp_options = options;
4216 	// XXX FIXME -- change this to not starve out the target queue
4217 	for (;;) {
4218 		if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
4219 			options &= ~MACH_RCV_MSG;
4220 			tmp_options &= ~MACH_RCV_MSG;
4221 
4222 			if (!(tmp_options & MACH_SEND_MSG)) {
4223 				goto out;
4224 			}
4225 		}
4226 		kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
4227 				(mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
4228 
4229 		tmp_options = options;
4230 
4231 		if (slowpath(kr)) {
4232 			switch (kr) {
4233 			case MACH_SEND_INVALID_DEST:
4234 			case MACH_SEND_TIMED_OUT:
4235 				if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
4236 					mach_msg_destroy(&bufReply->Head);
4237 				}
4238 				break;
4239 			case MACH_RCV_TIMED_OUT:
4240 				// Don't return an error if a message was sent this time or
4241 				// a message was successfully received previously
4242 				// rdar://problems/7363620&7791738
4243 				if(bufReply->Head.msgh_remote_port || received) {
4244 					kr = MACH_MSG_SUCCESS;
4245 				}
4246 				break;
4247 			case MACH_RCV_INVALID_NAME:
4248 				break;
4249 #if DISPATCH_DEBUG
4250 			case MACH_RCV_TOO_LARGE:
4251 				// receive messages that are too large and log their id and size
4252 				// rdar://problem/8422992
4253 				tmp_options &= ~MACH_RCV_LARGE;
4254 				size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
4255 				void *large_buf = malloc(large_size);
4256 				if (large_buf) {
4257 					rcv_size = large_size;
4258 					bufReply = large_buf;
4259 				}
4260 				if (!mach_msg(&bufReply->Head, tmp_options, 0,
4261 						(mach_msg_size_t)rcv_size,
4262 						(mach_port_t)ds->ds_ident_hack, 0, 0)) {
4263 					_dispatch_log("BUG in libdispatch client: "
4264 							"dispatch_mig_server received message larger than "
4265 							"requested size %zd: id = 0x%x, size = %d",
4266 							maxmsgsz, bufReply->Head.msgh_id,
4267 							bufReply->Head.msgh_size);
4268 				}
4269 				if (large_buf) {
4270 					free(large_buf);
4271 				}
4272 				// fall through
4273 #endif
4274 			default:
4275 				_dispatch_bug_mach_client(
4276 						"dispatch_mig_server: mach_msg() failed", kr);
4277 				break;
4278 			}
4279 			goto out;
4280 		}
4281 
4282 		if (!(tmp_options & MACH_RCV_MSG)) {
4283 			goto out;
4284 		}
4285 
4286 		if (assertion_token) {
4287 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4288 			int r = proc_importance_assertion_complete(assertion_token);
4289 			(void)dispatch_assume_zero(r);
4290 #endif
4291 			assertion_token = 0;
4292 		}
4293 		received = true;
4294 
4295 		bufTemp = bufRequest;
4296 		bufRequest = bufReply;
4297 		bufReply = bufTemp;
4298 
4299 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4300 		int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head,
4301 				NULL, &assertion_token);
4302 		if (r && slowpath(r != EIO)) {
4303 			(void)dispatch_assume_zero(r);
4304 		}
4305 #endif
4306 		_voucher_replace(voucher_create_with_mach_msg(&bufRequest->Head));
4307 		demux_success = callback(&bufRequest->Head, &bufReply->Head);
4308 
4309 		if (!demux_success) {
4310 			// destroy the request - but not the reply port
4311 			bufRequest->Head.msgh_remote_port = 0;
4312 			mach_msg_destroy(&bufRequest->Head);
4313 		} else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
4314 			// if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
4315 			// is present
4316 			if (slowpath(bufReply->RetCode)) {
4317 				if (bufReply->RetCode == MIG_NO_REPLY) {
4318 					continue;
4319 				}
4320 
4321 				// destroy the request - but not the reply port
4322 				bufRequest->Head.msgh_remote_port = 0;
4323 				mach_msg_destroy(&bufRequest->Head);
4324 			}
4325 		}
4326 
4327 		if (bufReply->Head.msgh_remote_port) {
4328 			tmp_options |= MACH_SEND_MSG;
4329 			if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
4330 					MACH_MSG_TYPE_MOVE_SEND_ONCE) {
4331 				tmp_options |= MACH_SEND_TIMEOUT;
4332 			}
4333 		}
4334 	}
4335 
4336 out:
4337 	if (assertion_token) {
4338 #if DISPATCH_USE_IMPORTANCE_ASSERTION
4339 		int r = proc_importance_assertion_complete(assertion_token);
4340 		(void)dispatch_assume_zero(r);
4341 #endif
4342 	}
4343 
4344 	return kr;
4345 }
4346 
4347 #endif /* HAVE_MACH */
4348 
4349 #pragma mark -
4350 #pragma mark dispatch_source_debug
4351 
4352 DISPATCH_NOINLINE
4353 static const char *
_evfiltstr(short filt)4354 _evfiltstr(short filt)
4355 {
4356 	switch (filt) {
4357 #define _evfilt2(f) case (f): return #f
4358 	_evfilt2(EVFILT_READ);
4359 	_evfilt2(EVFILT_WRITE);
4360 	_evfilt2(EVFILT_AIO);
4361 	_evfilt2(EVFILT_VNODE);
4362 	_evfilt2(EVFILT_PROC);
4363 	_evfilt2(EVFILT_SIGNAL);
4364 	_evfilt2(EVFILT_TIMER);
4365 #ifdef EVFILT_VM
4366 	_evfilt2(EVFILT_VM);
4367 #endif
4368 #ifdef EVFILT_MEMORYSTATUS
4369 	_evfilt2(EVFILT_MEMORYSTATUS);
4370 #endif
4371 #if HAVE_MACH
4372 	_evfilt2(EVFILT_MACHPORT);
4373 	_evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION);
4374 #endif
4375 	_evfilt2(EVFILT_FS);
4376 	_evfilt2(EVFILT_USER);
4377 
4378 	_evfilt2(DISPATCH_EVFILT_TIMER);
4379 	_evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
4380 	_evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
4381 	default:
4382 		return "EVFILT_missing";
4383 	}
4384 }
4385 
4386 static size_t
_dispatch_source_debug_attr(dispatch_source_t ds,char * buf,size_t bufsiz)4387 _dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4388 {
4389 	dispatch_queue_t target = ds->do_targetq;
4390 	return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, "
4391 			"pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
4392 			target && target->dq_label ? target->dq_label : "", target,
4393 			ds->ds_ident_hack, ds->ds_pending_data, ds->ds_pending_data_mask);
4394 }
4395 
4396 static size_t
_dispatch_timer_debug_attr(dispatch_source_t ds,char * buf,size_t bufsiz)4397 _dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4398 {
4399 	dispatch_source_refs_t dr = ds->ds_refs;
4400 	return dsnprintf(buf, bufsiz, "timer = { target = 0x%zx, deadline = 0x%zx,"
4401 			" last_fire = 0x%zx, interval = 0x%zx, flags = 0x%lx }, ",
4402 			ds_timer(dr).target, ds_timer(dr).deadline, ds_timer(dr).last_fire,
4403 			ds_timer(dr).interval, ds_timer(dr).flags);
4404 }
4405 
4406 size_t
_dispatch_source_debug(dispatch_source_t ds,char * buf,size_t bufsiz)4407 _dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
4408 {
4409 	size_t offset = 0;
4410 	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4411 			dx_kind(ds), ds);
4412 	offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
4413 	offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
4414 	if (ds->ds_is_timer) {
4415 		offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
4416 	}
4417 	offset += dsnprintf(&buf[offset], bufsiz - offset, "filter = %s }",
4418 			ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
4419 	return offset;
4420 }
4421 
4422 static size_t
_dispatch_mach_debug_attr(dispatch_mach_t dm,char * buf,size_t bufsiz)4423 _dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz)
4424 {
4425 	dispatch_queue_t target = dm->do_targetq;
4426 	return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, "
4427 			"send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
4428 			"sending = %d, disconnected = %d, canceled = %d ",
4429 			target && target->dq_label ? target->dq_label : "", target,
4430 			dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0,
4431 			dm->dm_refs->dm_send,
4432 			dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0,
4433 			dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) ?
4434 			" (armed)" : "", dm->dm_refs->dm_checkin_port,
4435 			dm->dm_refs->dm_checkin ? " (pending)" : "",
4436 			dm->dm_refs->dm_sending, dm->dm_refs->dm_disconnect_cnt,
4437 			(bool)(dm->ds_atomic_flags & DSF_CANCELED));
4438 }
4439 size_t
_dispatch_mach_debug(dispatch_mach_t dm,char * buf,size_t bufsiz)4440 _dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz)
4441 {
4442 	size_t offset = 0;
4443 	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4444 			dm->dq_label && !dm->dm_cancel_handler_called ? dm->dq_label :
4445 			dx_kind(dm), dm);
4446 	offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset);
4447 	offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset);
4448 	offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
4449 	return offset;
4450 }
4451 
4452 #if DISPATCH_DEBUG
4453 static void
_dispatch_kevent_debug(struct kevent64_s * kev,const char * str)4454 _dispatch_kevent_debug(struct kevent64_s* kev, const char* str)
4455 {
4456 	_dispatch_log("kevent[%p] = { ident = 0x%llx, filter = %s, flags = 0x%x, "
4457 			"fflags = 0x%x, data = 0x%llx, udata = 0x%llx, ext[0] = 0x%llx, "
4458 			"ext[1] = 0x%llx }: %s", kev, kev->ident, _evfiltstr(kev->filter),
4459 			kev->flags, kev->fflags, kev->data, kev->udata, kev->ext[0],
4460 			kev->ext[1], str);
4461 }
4462 
4463 static void
_dispatch_kevent_debugger2(void * context)4464 _dispatch_kevent_debugger2(void *context)
4465 {
4466 	struct sockaddr sa;
4467 	socklen_t sa_len = sizeof(sa);
4468 	int c, fd = (int)(long)context;
4469 	unsigned int i;
4470 	dispatch_kevent_t dk;
4471 	dispatch_source_t ds;
4472 	dispatch_source_refs_t dr;
4473 	FILE *debug_stream;
4474 
4475 	c = accept(fd, &sa, &sa_len);
4476 	if (c == -1) {
4477 		if (errno != EAGAIN) {
4478 			(void)dispatch_assume_zero(errno);
4479 		}
4480 		return;
4481 	}
4482 #if 0
4483 	int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
4484 	if (r == -1) {
4485 		(void)dispatch_assume_zero(errno);
4486 	}
4487 #endif
4488 	debug_stream = fdopen(c, "a");
4489 	if (!dispatch_assume(debug_stream)) {
4490 		close(c);
4491 		return;
4492 	}
4493 
4494 	fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
4495 	fprintf(debug_stream, "Content-type: text/html\r\n");
4496 	fprintf(debug_stream, "Pragma: nocache\r\n");
4497 	fprintf(debug_stream, "\r\n");
4498 	fprintf(debug_stream, "<html>\n");
4499 	fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid());
4500 	fprintf(debug_stream, "<body>\n<ul>\n");
4501 
4502 	//fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
4503 	//		"<td>DK</td><td>DK</td><td>DK</td></tr>\n");
4504 
4505 	for (i = 0; i < DSL_HASH_SIZE; i++) {
4506 		if (TAILQ_EMPTY(&_dispatch_sources[i])) {
4507 			continue;
4508 		}
4509 		TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
4510 			fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags "
4511 					"0x%hx fflags 0x%x data 0x%lx udata %p\n",
4512 					dk, (unsigned long)dk->dk_kevent.ident,
4513 					_evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
4514 					dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data,
4515 					(void*)dk->dk_kevent.udata);
4516 			fprintf(debug_stream, "\t\t<ul>\n");
4517 			TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) {
4518 				ds = _dispatch_source_from_refs(dr);
4519 				fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend "
4520 						"0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
4521 						ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt,
4522 						ds->ds_pending_data, ds->ds_pending_data_mask,
4523 						ds->ds_atomic_flags);
4524 				if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
4525 					dispatch_queue_t dq = ds->do_targetq;
4526 					fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend "
4527 							"0x%x label: %s\n", dq, dq->do_ref_cnt + 1,
4528 							dq->do_suspend_cnt, dq->dq_label ? dq->dq_label:"");
4529 				}
4530 			}
4531 			fprintf(debug_stream, "\t\t</ul>\n");
4532 			fprintf(debug_stream, "\t</li>\n");
4533 		}
4534 	}
4535 	fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
4536 	fflush(debug_stream);
4537 	fclose(debug_stream);
4538 }
4539 
4540 static void
_dispatch_kevent_debugger2_cancel(void * context)4541 _dispatch_kevent_debugger2_cancel(void *context)
4542 {
4543 	int ret, fd = (int)(long)context;
4544 
4545 	ret = close(fd);
4546 	if (ret != -1) {
4547 		(void)dispatch_assume_zero(errno);
4548 	}
4549 }
4550 
4551 static void
_dispatch_kevent_debugger(void * context DISPATCH_UNUSED)4552 _dispatch_kevent_debugger(void *context DISPATCH_UNUSED)
4553 {
4554 	union {
4555 		struct sockaddr_in sa_in;
4556 		struct sockaddr sa;
4557 	} sa_u = {
4558 		.sa_in = {
4559 			.sin_family = AF_INET,
4560 			.sin_addr = { htonl(INADDR_LOOPBACK), },
4561 		},
4562 	};
4563 	dispatch_source_t ds;
4564 	const char *valstr;
4565 	int val, r, fd, sock_opt = 1;
4566 	socklen_t slen = sizeof(sa_u);
4567 
4568 	if (issetugid()) {
4569 		return;
4570 	}
4571 	valstr = getenv("LIBDISPATCH_DEBUGGER");
4572 	if (!valstr) {
4573 		return;
4574 	}
4575 	val = atoi(valstr);
4576 	if (val == 2) {
4577 		sa_u.sa_in.sin_addr.s_addr = 0;
4578 	}
4579 	fd = socket(PF_INET, SOCK_STREAM, 0);
4580 	if (fd == -1) {
4581 		(void)dispatch_assume_zero(errno);
4582 		return;
4583 	}
4584 	r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt,
4585 			(socklen_t) sizeof sock_opt);
4586 	if (r == -1) {
4587 		(void)dispatch_assume_zero(errno);
4588 		goto out_bad;
4589 	}
4590 #if 0
4591 	r = fcntl(fd, F_SETFL, O_NONBLOCK);
4592 	if (r == -1) {
4593 		(void)dispatch_assume_zero(errno);
4594 		goto out_bad;
4595 	}
4596 #endif
4597 	r = bind(fd, &sa_u.sa, sizeof(sa_u));
4598 	if (r == -1) {
4599 		(void)dispatch_assume_zero(errno);
4600 		goto out_bad;
4601 	}
4602 	r = listen(fd, SOMAXCONN);
4603 	if (r == -1) {
4604 		(void)dispatch_assume_zero(errno);
4605 		goto out_bad;
4606 	}
4607 	r = getsockname(fd, &sa_u.sa, &slen);
4608 	if (r == -1) {
4609 		(void)dispatch_assume_zero(errno);
4610 		goto out_bad;
4611 	}
4612 
4613 	ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0,
4614 			&_dispatch_mgr_q);
4615 	if (dispatch_assume(ds)) {
4616 		_dispatch_log("LIBDISPATCH: debug port: %hu",
4617 				(in_port_t)ntohs(sa_u.sa_in.sin_port));
4618 
4619 		/* ownership of fd transfers to ds */
4620 		dispatch_set_context(ds, (void *)(long)fd);
4621 		dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2);
4622 		dispatch_source_set_cancel_handler_f(ds,
4623 				_dispatch_kevent_debugger2_cancel);
4624 		dispatch_resume(ds);
4625 
4626 		return;
4627 	}
4628 out_bad:
4629 	close(fd);
4630 }
4631 
4632 #if HAVE_MACH
4633 
4634 #ifndef MACH_PORT_TYPE_SPREQUEST
4635 #define MACH_PORT_TYPE_SPREQUEST 0x40000000
4636 #endif
4637 
4638 DISPATCH_NOINLINE
4639 void
dispatch_debug_machport(mach_port_t name,const char * str)4640 dispatch_debug_machport(mach_port_t name, const char* str)
4641 {
4642 	mach_port_type_t type;
4643 	mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
4644 	unsigned int dnreqs = 0, dnrsiz;
4645 	kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
4646 	if (kr) {
4647 		_dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
4648 				kr, mach_error_string(kr), str);
4649 		return;
4650 	}
4651 	if (type & MACH_PORT_TYPE_SEND) {
4652 		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4653 				MACH_PORT_RIGHT_SEND, &ns));
4654 	}
4655 	if (type & MACH_PORT_TYPE_SEND_ONCE) {
4656 		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4657 				MACH_PORT_RIGHT_SEND_ONCE, &nso));
4658 	}
4659 	if (type & MACH_PORT_TYPE_DEAD_NAME) {
4660 		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4661 				MACH_PORT_RIGHT_DEAD_NAME, &nd));
4662 	}
4663 	if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) {
4664 		kr = mach_port_dnrequest_info(mach_task_self(), name, &dnrsiz, &dnreqs);
4665 		if (kr != KERN_INVALID_RIGHT) (void)dispatch_assume_zero(kr);
4666 	}
4667 	if (type & MACH_PORT_TYPE_RECEIVE) {
4668 		mach_port_status_t status = { .mps_pset = 0, };
4669 		mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
4670 		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4671 				MACH_PORT_RIGHT_RECEIVE, &nr));
4672 		(void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
4673 				name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
4674 		_dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4675 				"dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
4676 				"sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
4677 				"seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
4678 				type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
4679 				status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
4680 				status.mps_srights ? "Y":"N", status.mps_sorights,
4681 				status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
4682 				status.mps_seqno, str);
4683 	} else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
4684 			MACH_PORT_TYPE_DEAD_NAME)) {
4685 		_dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4686 				"dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
4687 				type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
4688 	} else {
4689 		_dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
4690 				str);
4691 	}
4692 }
4693 
4694 #endif // HAVE_MACH
4695 
4696 #endif // DISPATCH_DEBUG
4697