xref: /NextBSD/lib/libdispatch/src/io.c (revision 33da5adc555b3bc29986eeadca03829e4ad06b1e)
1 /*
2  * Copyright (c) 2009-2013 Apple Inc. All rights reserved.
3  *
4  * @APPLE_APACHE_LICENSE_HEADER_START@
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * @APPLE_APACHE_LICENSE_HEADER_END@
19  */
20 
21 #include "internal.h"
22 
23 #ifndef DISPATCH_IO_DEBUG
24 #define DISPATCH_IO_DEBUG DISPATCH_DEBUG
25 #endif
26 
27 #if DISPATCH_IO_DEBUG
28 #define _dispatch_fd_debug(msg, fd, args...) \
29 	_dispatch_debug("fd[0x%x]: " msg, (fd), ##args)
30 #else
31 #define _dispatch_fd_debug(msg, fd, args...)
32 #endif
33 
34 #if USE_OBJC
35 #define _dispatch_io_data_retain(x) _dispatch_objc_retain(x)
36 #define _dispatch_io_data_release(x) _dispatch_objc_release(x)
37 #else
38 #define _dispatch_io_data_retain(x) dispatch_retain(x)
39 #define _dispatch_io_data_release(x) dispatch_release(x)
40 #endif
41 
42 typedef void (^dispatch_fd_entry_init_callback_t)(dispatch_fd_entry_t fd_entry);
43 
44 DISPATCH_EXPORT DISPATCH_NOTHROW
45 void _dispatch_iocntl(uint32_t param, uint64_t value);
46 
47 static dispatch_operation_t _dispatch_operation_create(
48 		dispatch_op_direction_t direction, dispatch_io_t channel, off_t offset,
49 		size_t length, dispatch_data_t data, dispatch_queue_t queue,
50 		dispatch_io_handler_t handler);
51 static void _dispatch_operation_enqueue(dispatch_operation_t op,
52 		dispatch_op_direction_t direction, dispatch_data_t data);
53 static dispatch_source_t _dispatch_operation_timer(dispatch_queue_t tq,
54 		dispatch_operation_t op);
55 static inline void _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry);
56 static inline void _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry);
57 static void _dispatch_fd_entry_init_async(dispatch_fd_t fd,
58 		dispatch_fd_entry_init_callback_t completion_callback);
59 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,
60 		uintptr_t hash);
61 static dispatch_fd_entry_t _dispatch_fd_entry_create_with_path(
62 		dispatch_io_path_data_t path_data, dev_t dev, mode_t mode);
63 static int _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,
64 		dispatch_io_t channel);
65 static void _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
66 		dispatch_io_t channel);
67 static void _dispatch_stream_init(dispatch_fd_entry_t fd_entry,
68 		dispatch_queue_t tq);
69 static void _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
70 		dispatch_op_direction_t direction);
71 static void _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev);
72 static void _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
73 		dispatch_operation_t operation, dispatch_data_t data);
74 static void _dispatch_disk_enqueue_operation(dispatch_disk_t dsk,
75 		dispatch_operation_t operation, dispatch_data_t data);
76 static void _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
77 		dispatch_io_t channel);
78 static void _dispatch_disk_cleanup_operations(dispatch_disk_t disk,
79 		dispatch_io_t channel);
80 static void _dispatch_stream_source_handler(void *ctx);
81 static void _dispatch_stream_queue_handler(void *ctx);
82 static void _dispatch_stream_handler(void *ctx);
83 static void _dispatch_disk_handler(void *ctx);
84 static void _dispatch_disk_perform(void *ctxt);
85 static void _dispatch_operation_advise(dispatch_operation_t op,
86 		size_t chunk_size);
87 static int _dispatch_operation_perform(dispatch_operation_t op);
88 static void _dispatch_operation_deliver_data(dispatch_operation_t op,
89 		dispatch_op_flags_t flags);
90 
91 // Macros to wrap syscalls which return -1 on error, and retry on EINTR
92 #define _dispatch_io_syscall_switch_noerr(_err, _syscall, ...) do { \
93 		switch (((_err) = (((_syscall) == -1) ? errno : 0))) { \
94 		case EINTR: continue; \
95 		__VA_ARGS__ \
96 		} \
97 		break; \
98 	} while (1)
99 #define _dispatch_io_syscall_switch(__err, __syscall, ...) do { \
100 		_dispatch_io_syscall_switch_noerr(__err, __syscall, \
101 		case 0: break; \
102 		__VA_ARGS__ \
103 		); \
104 	} while (0)
105 #define _dispatch_io_syscall(__syscall) do { int __err; \
106 		_dispatch_io_syscall_switch(__err, __syscall); \
107 	} while (0)
108 
109 enum {
110 	DISPATCH_OP_COMPLETE = 1,
111 	DISPATCH_OP_DELIVER,
112 	DISPATCH_OP_DELIVER_AND_COMPLETE,
113 	DISPATCH_OP_COMPLETE_RESUME,
114 	DISPATCH_OP_RESUME,
115 	DISPATCH_OP_ERR,
116 	DISPATCH_OP_FD_ERR,
117 };
118 
119 #define _dispatch_io_Block_copy(x) \
120 		((typeof(x))_dispatch_Block_copy((dispatch_block_t)(x)))
121 
122 #pragma mark -
123 #pragma mark dispatch_io_hashtables
124 
125 #if TARGET_OS_EMBEDDED
126 #define DIO_HASH_SIZE  64u // must be a power of two
127 #else
128 #define DIO_HASH_SIZE 256u // must be a power of two
129 #endif
130 #define DIO_HASH(x) ((uintptr_t)(x) & (DIO_HASH_SIZE - 1))
131 
132 // Global hashtable of dev_t -> disk_s mappings
133 DISPATCH_CACHELINE_ALIGN
134 static TAILQ_HEAD(, dispatch_disk_s) _dispatch_io_devs[DIO_HASH_SIZE];
135 // Global hashtable of fd -> fd_entry_s mappings
136 DISPATCH_CACHELINE_ALIGN
137 static TAILQ_HEAD(, dispatch_fd_entry_s) _dispatch_io_fds[DIO_HASH_SIZE];
138 
139 static dispatch_once_t  _dispatch_io_devs_lockq_pred;
140 static dispatch_queue_t _dispatch_io_devs_lockq;
141 static dispatch_queue_t _dispatch_io_fds_lockq;
142 
143 static void
_dispatch_io_fds_lockq_init(void * context DISPATCH_UNUSED)144 _dispatch_io_fds_lockq_init(void *context DISPATCH_UNUSED)
145 {
146 	_dispatch_io_fds_lockq = dispatch_queue_create(
147 			"com.apple.libdispatch-io.fd_lockq", NULL);
148 	unsigned int i;
149 	for (i = 0; i < DIO_HASH_SIZE; i++) {
150 		TAILQ_INIT(&_dispatch_io_fds[i]);
151 	}
152 }
153 
154 static void
_dispatch_io_devs_lockq_init(void * context DISPATCH_UNUSED)155 _dispatch_io_devs_lockq_init(void *context DISPATCH_UNUSED)
156 {
157 	_dispatch_io_devs_lockq = dispatch_queue_create(
158 			"com.apple.libdispatch-io.dev_lockq", NULL);
159 	unsigned int i;
160 	for (i = 0; i < DIO_HASH_SIZE; i++) {
161 		TAILQ_INIT(&_dispatch_io_devs[i]);
162 	}
163 }
164 
165 #pragma mark -
166 #pragma mark dispatch_io_defaults
167 
168 enum {
169 	DISPATCH_IOCNTL_CHUNK_PAGES = 1,
170 	DISPATCH_IOCNTL_LOW_WATER_CHUNKS,
171 	DISPATCH_IOCNTL_INITIAL_DELIVERY,
172 	DISPATCH_IOCNTL_MAX_PENDING_IO_REQS,
173 };
174 
175 static struct dispatch_io_defaults_s {
176 	size_t chunk_pages, low_water_chunks, max_pending_io_reqs;
177 	bool initial_delivery;
178 } dispatch_io_defaults = {
179 	.chunk_pages = DIO_MAX_CHUNK_PAGES,
180 	.low_water_chunks = DIO_DEFAULT_LOW_WATER_CHUNKS,
181 	.max_pending_io_reqs = DIO_MAX_PENDING_IO_REQS,
182 };
183 
184 #define _dispatch_iocntl_set_default(p, v) do { \
185 		dispatch_io_defaults.p = (typeof(dispatch_io_defaults.p))(v); \
186 	} while (0)
187 
188 void
_dispatch_iocntl(uint32_t param,uint64_t value)189 _dispatch_iocntl(uint32_t param, uint64_t value)
190 {
191 	switch (param) {
192 	case DISPATCH_IOCNTL_CHUNK_PAGES:
193 		_dispatch_iocntl_set_default(chunk_pages, value);
194 		break;
195 	case DISPATCH_IOCNTL_LOW_WATER_CHUNKS:
196 		_dispatch_iocntl_set_default(low_water_chunks, value);
197 		break;
198 	case DISPATCH_IOCNTL_INITIAL_DELIVERY:
199 		_dispatch_iocntl_set_default(initial_delivery, value);
200 	case DISPATCH_IOCNTL_MAX_PENDING_IO_REQS:
201 		_dispatch_iocntl_set_default(max_pending_io_reqs, value);
202 		break;
203 	}
204 }
205 
206 #pragma mark -
207 #pragma mark dispatch_io_t
208 
209 static dispatch_io_t
_dispatch_io_create(dispatch_io_type_t type)210 _dispatch_io_create(dispatch_io_type_t type)
211 {
212 	dispatch_io_t channel = _dispatch_alloc(DISPATCH_VTABLE(io),
213 			sizeof(struct dispatch_io_s));
214 	channel->do_next = DISPATCH_OBJECT_LISTLESS;
215 	channel->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
216 			true);
217 	channel->params.type = type;
218 	channel->params.high = SIZE_MAX;
219 	channel->params.low = dispatch_io_defaults.low_water_chunks *
220 			dispatch_io_defaults.chunk_pages * PAGE_SIZE;
221 	channel->queue = dispatch_queue_create("com.apple.libdispatch-io.channelq",
222 			NULL);
223 	return channel;
224 }
225 
226 static void
227 _dispatch_io_init(dispatch_io_t channel, dispatch_fd_entry_t fd_entry,
228 		dispatch_queue_t queue, int err, void (^cleanup_handler)(int))
229 {
230 	// Enqueue the cleanup handler on the suspended close queue
231 	if (cleanup_handler) {
232 		_dispatch_retain(queue);
233 		dispatch_async(!err ? fd_entry->close_queue : channel->queue, ^{
234 			dispatch_async(queue, ^{
235 				_dispatch_fd_debug("cleanup handler invoke", -1);
236 				cleanup_handler(err);
237 			});
238 			_dispatch_release(queue);
239 		});
240 	}
241 	if (fd_entry) {
242 		channel->fd_entry = fd_entry;
243 		dispatch_retain(fd_entry->barrier_queue);
244 		dispatch_retain(fd_entry->barrier_group);
245 		channel->barrier_queue = fd_entry->barrier_queue;
246 		channel->barrier_group = fd_entry->barrier_group;
247 	} else {
248 		// Still need to create a barrier queue, since all operations go
249 		// through it
250 		channel->barrier_queue = dispatch_queue_create(
251 				"com.apple.libdispatch-io.barrierq", NULL);
252 		channel->barrier_group = dispatch_group_create();
253 	}
254 }
255 
256 void
_dispatch_io_dispose(dispatch_io_t channel)257 _dispatch_io_dispose(dispatch_io_t channel)
258 {
259 	_dispatch_object_debug(channel, "%s", __func__);
260 	if (channel->fd_entry &&
261 			!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
262 		if (channel->fd_entry->path_data) {
263 			// This modification is safe since path_data->channel is checked
264 			// only on close_queue (which is still suspended at this point)
265 			channel->fd_entry->path_data->channel = NULL;
266 		}
267 		// Cleanup handlers will only run when all channels related to this
268 		// fd are complete
269 		_dispatch_fd_entry_release(channel->fd_entry);
270 	}
271 	if (channel->queue) {
272 		dispatch_release(channel->queue);
273 	}
274 	if (channel->barrier_queue) {
275 		dispatch_release(channel->barrier_queue);
276 	}
277 	if (channel->barrier_group) {
278 		dispatch_release(channel->barrier_group);
279 	}
280 }
281 
282 static int
_dispatch_io_validate_type(dispatch_io_t channel,mode_t mode)283 _dispatch_io_validate_type(dispatch_io_t channel, mode_t mode)
284 {
285 	int err = 0;
286 	if (S_ISDIR(mode)) {
287 		err = EISDIR;
288 	} else if (channel->params.type == DISPATCH_IO_RANDOM &&
289 			(S_ISFIFO(mode) || S_ISSOCK(mode))) {
290 		err = ESPIPE;
291 	}
292 	return err;
293 }
294 
295 static int
_dispatch_io_get_error(dispatch_operation_t op,dispatch_io_t channel,bool ignore_closed)296 _dispatch_io_get_error(dispatch_operation_t op, dispatch_io_t channel,
297 		bool ignore_closed)
298 {
299 	// On _any_ queue
300 	int err;
301 	if (op) {
302 		channel = op->channel;
303 	}
304 	if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
305 		if (!ignore_closed || channel->atomic_flags & DIO_STOPPED) {
306 			err = ECANCELED;
307 		} else {
308 			err = 0;
309 		}
310 	} else {
311 		err = op ? op->fd_entry->err : channel->err;
312 	}
313 	return err;
314 }
315 
316 #pragma mark -
317 #pragma mark dispatch_io_channels
318 
319 dispatch_io_t
320 dispatch_io_create(dispatch_io_type_t type, dispatch_fd_t fd,
321 		dispatch_queue_t queue, void (^cleanup_handler)(int))
322 {
323 	if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
324 		return NULL;
325 	}
326 	_dispatch_fd_debug("io create", fd);
327 	dispatch_io_t channel = _dispatch_io_create(type);
328 	channel->fd = fd;
329 	channel->fd_actual = fd;
330 	dispatch_suspend(channel->queue);
331 	_dispatch_retain(queue);
332 	_dispatch_retain(channel);
333 	_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
334 		// On barrier queue
335 		int err = fd_entry->err;
336 		if (!err) {
337 			err = _dispatch_io_validate_type(channel, fd_entry->stat.mode);
338 		}
339 		if (!err && type == DISPATCH_IO_RANDOM) {
340 			off_t f_ptr;
341 			_dispatch_io_syscall_switch_noerr(err,
342 				f_ptr = lseek(fd_entry->fd, 0, SEEK_CUR),
343 				case 0: channel->f_ptr = f_ptr; break;
344 				default: (void)dispatch_assume_zero(err); break;
345 			);
346 		}
347 		channel->err = err;
348 		_dispatch_fd_entry_retain(fd_entry);
349 		_dispatch_io_init(channel, fd_entry, queue, err, cleanup_handler);
350 		dispatch_resume(channel->queue);
351 		_dispatch_object_debug(channel, "%s", __func__);
352 		_dispatch_release(channel);
353 		_dispatch_release(queue);
354 	});
355 	_dispatch_object_debug(channel, "%s", __func__);
356 	return channel;
357 }
358 
359 dispatch_io_t
dispatch_io_create_f(dispatch_io_type_t type,dispatch_fd_t fd,dispatch_queue_t queue,void * context,void (* cleanup_handler)(void * context,int error))360 dispatch_io_create_f(dispatch_io_type_t type, dispatch_fd_t fd,
361 		dispatch_queue_t queue, void *context,
362 		void (*cleanup_handler)(void *context, int error))
363 {
364 	return dispatch_io_create(type, fd, queue, !cleanup_handler ? NULL :
365 			^(int error){ cleanup_handler(context, error); });
366 }
367 
368 dispatch_io_t
369 dispatch_io_create_with_path(dispatch_io_type_t type, const char *path,
370 		int oflag, mode_t mode, dispatch_queue_t queue,
371 		void (^cleanup_handler)(int error))
372 {
373 	if ((type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) ||
374 			!(path && *path == '/')) {
375 		return NULL;
376 	}
377 	size_t pathlen = strlen(path);
378 	dispatch_io_path_data_t path_data = malloc(sizeof(*path_data) + pathlen+1);
379 	if (!path_data) {
380 		return NULL;
381 	}
382 	_dispatch_fd_debug("io create with path %s", -1, path);
383 	dispatch_io_t channel = _dispatch_io_create(type);
384 	channel->fd = -1;
385 	channel->fd_actual = -1;
386 	path_data->channel = channel;
387 	path_data->oflag = oflag;
388 	path_data->mode = mode;
389 	path_data->pathlen = pathlen;
390 	memcpy(path_data->path, path, pathlen + 1);
391 	_dispatch_retain(queue);
392 	_dispatch_retain(channel);
393 	dispatch_async(channel->queue, ^{
394 		int err = 0;
395 		struct stat st;
396 		_dispatch_io_syscall_switch_noerr(err,
397 			(path_data->oflag & O_NOFOLLOW) == O_NOFOLLOW ||
398 					(path_data->oflag & O_SYMLINK) == O_SYMLINK ?
399 					lstat(path_data->path, &st) : stat(path_data->path, &st),
400 			case 0:
401 				err = _dispatch_io_validate_type(channel, st.st_mode);
402 				break;
403 			default:
404 				if ((path_data->oflag & O_CREAT) &&
405 						(*(path_data->path + path_data->pathlen - 1) != '/')) {
406 					// Check parent directory
407 					char *c = strrchr(path_data->path, '/');
408 					dispatch_assert(c);
409 					*c = 0;
410 					int perr;
411 					_dispatch_io_syscall_switch_noerr(perr,
412 						stat(path_data->path, &st),
413 						case 0:
414 							// Since the parent directory exists, open() will
415 							// create a regular file after the fd_entry has
416 							// been filled in
417 							st.st_mode = S_IFREG;
418 							err = 0;
419 							break;
420 					);
421 					*c = '/';
422 				}
423 				break;
424 		);
425 		channel->err = err;
426 		if (err) {
427 			free(path_data);
428 			_dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
429 			_dispatch_release(channel);
430 			_dispatch_release(queue);
431 			return;
432 		}
433 		dispatch_suspend(channel->queue);
434 		dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
435 				_dispatch_io_devs_lockq_init);
436 		dispatch_async(_dispatch_io_devs_lockq, ^{
437 			dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create_with_path(
438 					path_data, st.st_dev, st.st_mode);
439 			_dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
440 			dispatch_resume(channel->queue);
441 			_dispatch_object_debug(channel, "%s", __func__);
442 			_dispatch_release(channel);
443 			_dispatch_release(queue);
444 		});
445 	});
446 	_dispatch_object_debug(channel, "%s", __func__);
447 	return channel;
448 }
449 
450 dispatch_io_t
dispatch_io_create_with_path_f(dispatch_io_type_t type,const char * path,int oflag,mode_t mode,dispatch_queue_t queue,void * context,void (* cleanup_handler)(void * context,int error))451 dispatch_io_create_with_path_f(dispatch_io_type_t type, const char *path,
452 		int oflag, mode_t mode, dispatch_queue_t queue, void *context,
453 		void (*cleanup_handler)(void *context, int error))
454 {
455 	return dispatch_io_create_with_path(type, path, oflag, mode, queue,
456 			!cleanup_handler ? NULL :
457 			^(int error){ cleanup_handler(context, error); });
458 }
459 
460 dispatch_io_t
461 dispatch_io_create_with_io(dispatch_io_type_t type, dispatch_io_t in_channel,
462 		dispatch_queue_t queue, void (^cleanup_handler)(int error))
463 {
464 	if (type != DISPATCH_IO_STREAM && type != DISPATCH_IO_RANDOM) {
465 		return NULL;
466 	}
467 	_dispatch_fd_debug("io create with io %p", -1, in_channel);
468 	dispatch_io_t channel = _dispatch_io_create(type);
469 	dispatch_suspend(channel->queue);
470 	_dispatch_retain(queue);
471 	_dispatch_retain(channel);
472 	_dispatch_retain(in_channel);
473 	dispatch_async(in_channel->queue, ^{
474 		int err0 = _dispatch_io_get_error(NULL, in_channel, false);
475 		if (err0) {
476 			channel->err = err0;
477 			_dispatch_io_init(channel, NULL, queue, err0, cleanup_handler);
478 			dispatch_resume(channel->queue);
479 			_dispatch_release(channel);
480 			_dispatch_release(in_channel);
481 			_dispatch_release(queue);
482 			return;
483 		}
484 		dispatch_async(in_channel->barrier_queue, ^{
485 			int err = _dispatch_io_get_error(NULL, in_channel, false);
486 			// If there is no error, the fd_entry for the in_channel is valid.
487 			// Since we are running on in_channel's queue, the fd_entry has been
488 			// fully resolved and will stay valid for the duration of this block
489 			if (!err) {
490 				err = in_channel->err;
491 				if (!err) {
492 					err = in_channel->fd_entry->err;
493 				}
494 			}
495 			if (!err) {
496 				err = _dispatch_io_validate_type(channel,
497 						in_channel->fd_entry->stat.mode);
498 			}
499 			if (!err && type == DISPATCH_IO_RANDOM && in_channel->fd != -1) {
500 				off_t f_ptr;
501 				_dispatch_io_syscall_switch_noerr(err,
502 					f_ptr = lseek(in_channel->fd_entry->fd, 0, SEEK_CUR),
503 					case 0: channel->f_ptr = f_ptr; break;
504 					default: (void)dispatch_assume_zero(err); break;
505 				);
506 			}
507 			channel->err = err;
508 			if (err) {
509 				_dispatch_io_init(channel, NULL, queue, err, cleanup_handler);
510 				dispatch_resume(channel->queue);
511 				_dispatch_release(channel);
512 				_dispatch_release(in_channel);
513 				_dispatch_release(queue);
514 				return;
515 			}
516 			if (in_channel->fd == -1) {
517 				// in_channel was created from path
518 				channel->fd = -1;
519 				channel->fd_actual = -1;
520 				mode_t mode = in_channel->fd_entry->stat.mode;
521 				dev_t dev = in_channel->fd_entry->stat.dev;
522 				size_t path_data_len = sizeof(struct dispatch_io_path_data_s) +
523 						in_channel->fd_entry->path_data->pathlen + 1;
524 				dispatch_io_path_data_t path_data = malloc(path_data_len);
525 				memcpy(path_data, in_channel->fd_entry->path_data,
526 						path_data_len);
527 				path_data->channel = channel;
528 				// lockq_io_devs is known to already exist
529 				dispatch_async(_dispatch_io_devs_lockq, ^{
530 					dispatch_fd_entry_t fd_entry;
531 					fd_entry = _dispatch_fd_entry_create_with_path(path_data,
532 							dev, mode);
533 					_dispatch_io_init(channel, fd_entry, queue, 0,
534 							cleanup_handler);
535 					dispatch_resume(channel->queue);
536 					_dispatch_release(channel);
537 					_dispatch_release(queue);
538 				});
539 			} else {
540 				dispatch_fd_entry_t fd_entry = in_channel->fd_entry;
541 				channel->fd = in_channel->fd;
542 				channel->fd_actual = in_channel->fd_actual;
543 				_dispatch_fd_entry_retain(fd_entry);
544 				_dispatch_io_init(channel, fd_entry, queue, 0, cleanup_handler);
545 				dispatch_resume(channel->queue);
546 				_dispatch_release(channel);
547 				_dispatch_release(queue);
548 			}
549 			_dispatch_release(in_channel);
550 			_dispatch_object_debug(channel, "%s", __func__);
551 		});
552 	});
553 	_dispatch_object_debug(channel, "%s", __func__);
554 	return channel;
555 }
556 
557 dispatch_io_t
dispatch_io_create_with_io_f(dispatch_io_type_t type,dispatch_io_t in_channel,dispatch_queue_t queue,void * context,void (* cleanup_handler)(void * context,int error))558 dispatch_io_create_with_io_f(dispatch_io_type_t type, dispatch_io_t in_channel,
559 		dispatch_queue_t queue, void *context,
560 		void (*cleanup_handler)(void *context, int error))
561 {
562 	return dispatch_io_create_with_io(type, in_channel, queue,
563 			!cleanup_handler ? NULL :
564 			^(int error){ cleanup_handler(context, error); });
565 }
566 
567 #pragma mark -
568 #pragma mark dispatch_io_accessors
569 
570 void
dispatch_io_set_high_water(dispatch_io_t channel,size_t high_water)571 dispatch_io_set_high_water(dispatch_io_t channel, size_t high_water)
572 {
573 	_dispatch_retain(channel);
574 	dispatch_async(channel->queue, ^{
575 		_dispatch_fd_debug("io set high water", channel->fd);
576 		if (channel->params.low > high_water) {
577 			channel->params.low = high_water;
578 		}
579 		channel->params.high = high_water ? high_water : 1;
580 		_dispatch_release(channel);
581 	});
582 }
583 
584 void
dispatch_io_set_low_water(dispatch_io_t channel,size_t low_water)585 dispatch_io_set_low_water(dispatch_io_t channel, size_t low_water)
586 {
587 	_dispatch_retain(channel);
588 	dispatch_async(channel->queue, ^{
589 		_dispatch_fd_debug("io set low water", channel->fd);
590 		if (channel->params.high < low_water) {
591 			channel->params.high = low_water ? low_water : 1;
592 		}
593 		channel->params.low = low_water;
594 		_dispatch_release(channel);
595 	});
596 }
597 
598 void
dispatch_io_set_interval(dispatch_io_t channel,uint64_t interval,unsigned long flags)599 dispatch_io_set_interval(dispatch_io_t channel, uint64_t interval,
600 		unsigned long flags)
601 {
602 	_dispatch_retain(channel);
603 	dispatch_async(channel->queue, ^{
604 		_dispatch_fd_debug("io set interval", channel->fd);
605 		channel->params.interval = interval < INT64_MAX ? interval : INT64_MAX;
606 		channel->params.interval_flags = flags;
607 		_dispatch_release(channel);
608 	});
609 }
610 
611 void
_dispatch_io_set_target_queue(dispatch_io_t channel,dispatch_queue_t dq)612 _dispatch_io_set_target_queue(dispatch_io_t channel, dispatch_queue_t dq)
613 {
614 	_dispatch_retain(dq);
615 	_dispatch_retain(channel);
616 	dispatch_async(channel->queue, ^{
617 		dispatch_queue_t prev_dq = channel->do_targetq;
618 		channel->do_targetq = dq;
619 		_dispatch_release(prev_dq);
620 		_dispatch_object_debug(channel, "%s", __func__);
621 		_dispatch_release(channel);
622 	});
623 }
624 
625 dispatch_fd_t
dispatch_io_get_descriptor(dispatch_io_t channel)626 dispatch_io_get_descriptor(dispatch_io_t channel)
627 {
628 	if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
629 		return -1;
630 	}
631 	dispatch_fd_t fd = channel->fd_actual;
632 	if (fd == -1 && _dispatch_thread_getspecific(dispatch_io_key) == channel &&
633 			!_dispatch_io_get_error(NULL, channel, false)) {
634 		dispatch_fd_entry_t fd_entry = channel->fd_entry;
635 		(void)_dispatch_fd_entry_open(fd_entry, channel);
636 	}
637 	return channel->fd_actual;
638 }
639 
640 #pragma mark -
641 #pragma mark dispatch_io_operations
642 
643 static void
_dispatch_io_stop(dispatch_io_t channel)644 _dispatch_io_stop(dispatch_io_t channel)
645 {
646 	_dispatch_fd_debug("io stop", channel->fd);
647 	(void)dispatch_atomic_or2o(channel, atomic_flags, DIO_STOPPED, relaxed);
648 	_dispatch_retain(channel);
649 	dispatch_async(channel->queue, ^{
650 		dispatch_async(channel->barrier_queue, ^{
651 			_dispatch_object_debug(channel, "%s", __func__);
652 			dispatch_fd_entry_t fd_entry = channel->fd_entry;
653 			if (fd_entry) {
654 				_dispatch_fd_debug("io stop cleanup", channel->fd);
655 				_dispatch_fd_entry_cleanup_operations(fd_entry, channel);
656 				if (!(channel->atomic_flags & DIO_CLOSED)) {
657 					channel->fd_entry = NULL;
658 					_dispatch_fd_entry_release(fd_entry);
659 				}
660 			} else if (channel->fd != -1) {
661 				// Stop after close, need to check if fd_entry still exists
662 				_dispatch_retain(channel);
663 				dispatch_async(_dispatch_io_fds_lockq, ^{
664 					_dispatch_object_debug(channel, "%s", __func__);
665 					_dispatch_fd_debug("io stop after close cleanup",
666 							channel->fd);
667 					dispatch_fd_entry_t fdi;
668 					uintptr_t hash = DIO_HASH(channel->fd);
669 					TAILQ_FOREACH(fdi, &_dispatch_io_fds[hash], fd_list) {
670 						if (fdi->fd == channel->fd) {
671 							_dispatch_fd_entry_cleanup_operations(fdi, channel);
672 							break;
673 						}
674 					}
675 					_dispatch_release(channel);
676 				});
677 			}
678 			_dispatch_release(channel);
679 		});
680 	});
681 }
682 
683 void
dispatch_io_close(dispatch_io_t channel,unsigned long flags)684 dispatch_io_close(dispatch_io_t channel, unsigned long flags)
685 {
686 	if (flags & DISPATCH_IO_STOP) {
687 		// Don't stop an already stopped channel
688 		if (channel->atomic_flags & DIO_STOPPED) {
689 			return;
690 		}
691 		return _dispatch_io_stop(channel);
692 	}
693 	// Don't close an already closed or stopped channel
694 	if (channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED)) {
695 		return;
696 	}
697 	_dispatch_retain(channel);
698 	dispatch_async(channel->queue, ^{
699 		dispatch_async(channel->barrier_queue, ^{
700 			_dispatch_object_debug(channel, "%s", __func__);
701 			_dispatch_fd_debug("io close", channel->fd);
702 			if (!(channel->atomic_flags & (DIO_CLOSED|DIO_STOPPED))) {
703 				(void)dispatch_atomic_or2o(channel, atomic_flags, DIO_CLOSED,
704 						relaxed);
705 				dispatch_fd_entry_t fd_entry = channel->fd_entry;
706 				if (fd_entry) {
707 					if (!fd_entry->path_data) {
708 						channel->fd_entry = NULL;
709 					}
710 					_dispatch_fd_entry_release(fd_entry);
711 				}
712 			}
713 			_dispatch_release(channel);
714 		});
715 	});
716 }
717 
718 void
dispatch_io_barrier(dispatch_io_t channel,dispatch_block_t barrier)719 dispatch_io_barrier(dispatch_io_t channel, dispatch_block_t barrier)
720 {
721 	_dispatch_retain(channel);
722 	dispatch_async(channel->queue, ^{
723 		dispatch_queue_t io_q = channel->do_targetq;
724 		dispatch_queue_t barrier_queue = channel->barrier_queue;
725 		dispatch_group_t barrier_group = channel->barrier_group;
726 		dispatch_async(barrier_queue, ^{
727 			dispatch_suspend(barrier_queue);
728 			dispatch_group_notify(barrier_group, io_q, ^{
729 				_dispatch_object_debug(channel, "%s", __func__);
730 				_dispatch_thread_setspecific(dispatch_io_key, channel);
731 				barrier();
732 				_dispatch_thread_setspecific(dispatch_io_key, NULL);
733 				dispatch_resume(barrier_queue);
734 				_dispatch_release(channel);
735 			});
736 		});
737 	});
738 }
739 
740 void
dispatch_io_barrier_f(dispatch_io_t channel,void * context,dispatch_function_t barrier)741 dispatch_io_barrier_f(dispatch_io_t channel, void *context,
742 		dispatch_function_t barrier)
743 {
744 	return dispatch_io_barrier(channel, ^{ barrier(context); });
745 }
746 
747 void
dispatch_io_read(dispatch_io_t channel,off_t offset,size_t length,dispatch_queue_t queue,dispatch_io_handler_t handler)748 dispatch_io_read(dispatch_io_t channel, off_t offset, size_t length,
749 		dispatch_queue_t queue, dispatch_io_handler_t handler)
750 {
751 	_dispatch_retain(channel);
752 	_dispatch_retain(queue);
753 	dispatch_async(channel->queue, ^{
754 		dispatch_operation_t op;
755 		op = _dispatch_operation_create(DOP_DIR_READ, channel, offset,
756 				length, dispatch_data_empty, queue, handler);
757 		if (op) {
758 			dispatch_queue_t barrier_q = channel->barrier_queue;
759 			dispatch_async(barrier_q, ^{
760 				_dispatch_operation_enqueue(op, DOP_DIR_READ,
761 						dispatch_data_empty);
762 			});
763 		}
764 		_dispatch_release(channel);
765 		_dispatch_release(queue);
766 	});
767 }
768 
769 void
dispatch_io_read_f(dispatch_io_t channel,off_t offset,size_t length,dispatch_queue_t queue,void * context,dispatch_io_handler_function_t handler)770 dispatch_io_read_f(dispatch_io_t channel, off_t offset, size_t length,
771 		dispatch_queue_t queue, void *context,
772 		dispatch_io_handler_function_t handler)
773 {
774 	return dispatch_io_read(channel, offset, length, queue,
775 			^(bool done, dispatch_data_t d, int error){
776 		handler(context, done, d, error);
777 	});
778 }
779 
780 void
dispatch_io_write(dispatch_io_t channel,off_t offset,dispatch_data_t data,dispatch_queue_t queue,dispatch_io_handler_t handler)781 dispatch_io_write(dispatch_io_t channel, off_t offset, dispatch_data_t data,
782 		dispatch_queue_t queue, dispatch_io_handler_t handler)
783 {
784 	_dispatch_io_data_retain(data);
785 	_dispatch_retain(channel);
786 	_dispatch_retain(queue);
787 	dispatch_async(channel->queue, ^{
788 		dispatch_operation_t op;
789 		op = _dispatch_operation_create(DOP_DIR_WRITE, channel, offset,
790 				dispatch_data_get_size(data), data, queue, handler);
791 		if (op) {
792 			dispatch_queue_t barrier_q = channel->barrier_queue;
793 			dispatch_async(barrier_q, ^{
794 				_dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
795 				_dispatch_io_data_release(data);
796 			});
797 		} else {
798 			_dispatch_io_data_release(data);
799 		}
800 		_dispatch_release(channel);
801 		_dispatch_release(queue);
802 	});
803 }
804 
805 void
dispatch_io_write_f(dispatch_io_t channel,off_t offset,dispatch_data_t data,dispatch_queue_t queue,void * context,dispatch_io_handler_function_t handler)806 dispatch_io_write_f(dispatch_io_t channel, off_t offset, dispatch_data_t data,
807 		dispatch_queue_t queue, void *context,
808 		dispatch_io_handler_function_t handler)
809 {
810 	return dispatch_io_write(channel, offset, data, queue,
811 			^(bool done, dispatch_data_t d, int error){
812 		handler(context, done, d, error);
813 	});
814 }
815 
816 void
817 dispatch_read(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
818 		void (^handler)(dispatch_data_t, int))
819 {
820 	_dispatch_retain(queue);
821 	_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
822 		// On barrier queue
823 		if (fd_entry->err) {
824 			int err = fd_entry->err;
825 			dispatch_async(queue, ^{
826 				_dispatch_fd_debug("convenience handler invoke", fd);
827 				handler(dispatch_data_empty, err);
828 			});
829 			_dispatch_release(queue);
830 			return;
831 		}
832 		// Safe to access fd_entry on barrier queue
833 		dispatch_io_t channel = fd_entry->convenience_channel;
834 		if (!channel) {
835 			channel = _dispatch_io_create(DISPATCH_IO_STREAM);
836 			channel->fd = fd;
837 			channel->fd_actual = fd;
838 			channel->fd_entry = fd_entry;
839 			dispatch_retain(fd_entry->barrier_queue);
840 			dispatch_retain(fd_entry->barrier_group);
841 			channel->barrier_queue = fd_entry->barrier_queue;
842 			channel->barrier_group = fd_entry->barrier_group;
843 			fd_entry->convenience_channel = channel;
844 		}
845 		__block dispatch_data_t deliver_data = dispatch_data_empty;
846 		__block int err = 0;
847 		dispatch_async(fd_entry->close_queue, ^{
848 			dispatch_async(queue, ^{
849 				_dispatch_fd_debug("convenience handler invoke", fd);
850 				handler(deliver_data, err);
851 				_dispatch_io_data_release(deliver_data);
852 			});
853 			_dispatch_release(queue);
854 		});
855 		dispatch_operation_t op =
856 			_dispatch_operation_create(DOP_DIR_READ, channel, 0,
857 					length, dispatch_data_empty,
858 					_dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
859 					^(bool done, dispatch_data_t data, int error) {
860 				if (data) {
861 					data = dispatch_data_create_concat(deliver_data, data);
862 					_dispatch_io_data_release(deliver_data);
863 					deliver_data = data;
864 				}
865 				if (done) {
866 					err = error;
867 				}
868 			});
869 		if (op) {
870 			_dispatch_operation_enqueue(op, DOP_DIR_READ, dispatch_data_empty);
871 		}
872 	});
873 }
874 
875 void
dispatch_read_f(dispatch_fd_t fd,size_t length,dispatch_queue_t queue,void * context,void (* handler)(void *,dispatch_data_t,int))876 dispatch_read_f(dispatch_fd_t fd, size_t length, dispatch_queue_t queue,
877 		void *context, void (*handler)(void *, dispatch_data_t, int))
878 {
879 	return dispatch_read(fd, length, queue, ^(dispatch_data_t d, int error){
880 		handler(context, d, error);
881 	});
882 }
883 
884 void
885 dispatch_write(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
886 		void (^handler)(dispatch_data_t, int))
887 {
888 	_dispatch_io_data_retain(data);
889 	_dispatch_retain(queue);
890 	_dispatch_fd_entry_init_async(fd, ^(dispatch_fd_entry_t fd_entry) {
891 		// On barrier queue
892 		if (fd_entry->err) {
893 			int err = fd_entry->err;
894 			dispatch_async(queue, ^{
895 				_dispatch_fd_debug("convenience handler invoke", fd);
896 				handler(NULL, err);
897 			});
898 			_dispatch_release(queue);
899 			return;
900 		}
901 		// Safe to access fd_entry on barrier queue
902 		dispatch_io_t channel = fd_entry->convenience_channel;
903 		if (!channel) {
904 			channel = _dispatch_io_create(DISPATCH_IO_STREAM);
905 			channel->fd = fd;
906 			channel->fd_actual = fd;
907 			channel->fd_entry = fd_entry;
908 			dispatch_retain(fd_entry->barrier_queue);
909 			dispatch_retain(fd_entry->barrier_group);
910 			channel->barrier_queue = fd_entry->barrier_queue;
911 			channel->barrier_group = fd_entry->barrier_group;
912 			fd_entry->convenience_channel = channel;
913 		}
914 		__block dispatch_data_t deliver_data = NULL;
915 		__block int err = 0;
916 		dispatch_async(fd_entry->close_queue, ^{
917 			dispatch_async(queue, ^{
918 				_dispatch_fd_debug("convenience handler invoke", fd);
919 				handler(deliver_data, err);
920 				if (deliver_data) {
921 					_dispatch_io_data_release(deliver_data);
922 				}
923 			});
924 			_dispatch_release(queue);
925 		});
926 		dispatch_operation_t op =
927 			_dispatch_operation_create(DOP_DIR_WRITE, channel, 0,
928 					dispatch_data_get_size(data), data,
929 					_dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,false),
930 					^(bool done, dispatch_data_t d, int error) {
931 				if (done) {
932 					if (d) {
933 						_dispatch_io_data_retain(d);
934 						deliver_data = d;
935 					}
936 					err = error;
937 				}
938 			});
939 		if (op) {
940 			_dispatch_operation_enqueue(op, DOP_DIR_WRITE, data);
941 		}
942 		_dispatch_io_data_release(data);
943 	});
944 }
945 
946 void
dispatch_write_f(dispatch_fd_t fd,dispatch_data_t data,dispatch_queue_t queue,void * context,void (* handler)(void *,dispatch_data_t,int))947 dispatch_write_f(dispatch_fd_t fd, dispatch_data_t data, dispatch_queue_t queue,
948 		void *context, void (*handler)(void *, dispatch_data_t, int))
949 {
950 	return dispatch_write(fd, data, queue, ^(dispatch_data_t d, int error){
951 		handler(context, d, error);
952 	});
953 }
954 
955 #pragma mark -
956 #pragma mark dispatch_operation_t
957 
958 static dispatch_operation_t
_dispatch_operation_create(dispatch_op_direction_t direction,dispatch_io_t channel,off_t offset,size_t length,dispatch_data_t data,dispatch_queue_t queue,dispatch_io_handler_t handler)959 _dispatch_operation_create(dispatch_op_direction_t direction,
960 		dispatch_io_t channel, off_t offset, size_t length,
961 		dispatch_data_t data, dispatch_queue_t queue,
962 		dispatch_io_handler_t handler)
963 {
964 	// On channel queue
965 	dispatch_assert(direction < DOP_DIR_MAX);
966 	_dispatch_fd_debug("operation create", channel->fd);
967 #if DISPATCH_IO_DEBUG
968 	int fd = channel->fd;
969 #endif
970 	// Safe to call _dispatch_io_get_error() with channel->fd_entry since
971 	// that can only be NULL if atomic_flags are set rdar://problem/8362514
972 	int err = _dispatch_io_get_error(NULL, channel, false);
973 	if (err || !length) {
974 		_dispatch_io_data_retain(data);
975 		_dispatch_retain(queue);
976 		dispatch_async(channel->barrier_queue, ^{
977 			dispatch_async(queue, ^{
978 				dispatch_data_t d = data;
979 				if (direction == DOP_DIR_READ && err) {
980 					d = NULL;
981 				} else if (direction == DOP_DIR_WRITE && !err) {
982 					d = NULL;
983 				}
984 				_dispatch_fd_debug("IO handler invoke", fd);
985 				handler(true, d, err);
986 				_dispatch_io_data_release(data);
987 			});
988 			_dispatch_release(queue);
989 		});
990 		return NULL;
991 	}
992 	dispatch_operation_t op = _dispatch_alloc(DISPATCH_VTABLE(operation),
993 			sizeof(struct dispatch_operation_s));
994 	op->do_next = DISPATCH_OBJECT_LISTLESS;
995 	op->do_xref_cnt = -1; // operation object is not exposed externally
996 	op->op_q = dispatch_queue_create("com.apple.libdispatch-io.opq", NULL);
997 	op->op_q->do_targetq = queue;
998 	_dispatch_retain(queue);
999 	op->active = false;
1000 	op->direction = direction;
1001 	op->offset = offset + channel->f_ptr;
1002 	op->length = length;
1003 	op->handler = _dispatch_io_Block_copy(handler);
1004 	_dispatch_retain(channel);
1005 	op->channel = channel;
1006 	op->params = channel->params;
1007 	// Take a snapshot of the priority of the channel queue. The actual I/O
1008 	// for this operation will be performed at this priority
1009 	dispatch_queue_t targetq = op->channel->do_targetq;
1010 	while (fastpath(targetq->do_targetq)) {
1011 		targetq = targetq->do_targetq;
1012 	}
1013 	op->do_targetq = targetq;
1014 	_dispatch_object_debug(op, "%s", __func__);
1015 	return op;
1016 }
1017 
1018 void
_dispatch_operation_dispose(dispatch_operation_t op)1019 _dispatch_operation_dispose(dispatch_operation_t op)
1020 {
1021 	_dispatch_object_debug(op, "%s", __func__);
1022 	// Deliver the data if there's any
1023 	if (op->fd_entry) {
1024 		_dispatch_operation_deliver_data(op, DOP_DONE);
1025 		dispatch_group_leave(op->fd_entry->barrier_group);
1026 		_dispatch_fd_entry_release(op->fd_entry);
1027 	}
1028 	if (op->channel) {
1029 		_dispatch_release(op->channel);
1030 	}
1031 	if (op->timer) {
1032 		dispatch_release(op->timer);
1033 	}
1034 	// For write operations, op->buf is owned by op->buf_data
1035 	if (op->buf && op->direction == DOP_DIR_READ) {
1036 		free(op->buf);
1037 	}
1038 	if (op->buf_data) {
1039 		_dispatch_io_data_release(op->buf_data);
1040 	}
1041 	if (op->data) {
1042 		_dispatch_io_data_release(op->data);
1043 	}
1044 	if (op->op_q) {
1045 		dispatch_release(op->op_q);
1046 	}
1047 	Block_release(op->handler);
1048 }
1049 
1050 static void
_dispatch_operation_enqueue(dispatch_operation_t op,dispatch_op_direction_t direction,dispatch_data_t data)1051 _dispatch_operation_enqueue(dispatch_operation_t op,
1052 		dispatch_op_direction_t direction, dispatch_data_t data)
1053 {
1054 	// Called from the barrier queue
1055 	_dispatch_io_data_retain(data);
1056 	// If channel is closed or stopped, then call the handler immediately
1057 	int err = _dispatch_io_get_error(NULL, op->channel, false);
1058 	if (err) {
1059 		dispatch_io_handler_t handler = op->handler;
1060 		dispatch_async(op->op_q, ^{
1061 			dispatch_data_t d = data;
1062 			if (direction == DOP_DIR_READ && err) {
1063 				d = NULL;
1064 			} else if (direction == DOP_DIR_WRITE && !err) {
1065 				d = NULL;
1066 			}
1067 			handler(true, d, err);
1068 			_dispatch_io_data_release(data);
1069 		});
1070 		_dispatch_release(op);
1071 		return;
1072 	}
1073 	// Finish operation init
1074 	op->fd_entry = op->channel->fd_entry;
1075 	_dispatch_fd_entry_retain(op->fd_entry);
1076 	dispatch_group_enter(op->fd_entry->barrier_group);
1077 	dispatch_disk_t disk = op->fd_entry->disk;
1078 	if (!disk) {
1079 		dispatch_stream_t stream = op->fd_entry->streams[direction];
1080 		dispatch_async(stream->dq, ^{
1081 			_dispatch_stream_enqueue_operation(stream, op, data);
1082 			_dispatch_io_data_release(data);
1083 		});
1084 	} else {
1085 		dispatch_async(disk->pick_queue, ^{
1086 			_dispatch_disk_enqueue_operation(disk, op, data);
1087 			_dispatch_io_data_release(data);
1088 		});
1089 	}
1090 }
1091 
1092 static bool
_dispatch_operation_should_enqueue(dispatch_operation_t op,dispatch_queue_t tq,dispatch_data_t data)1093 _dispatch_operation_should_enqueue(dispatch_operation_t op,
1094 		dispatch_queue_t tq, dispatch_data_t data)
1095 {
1096 	// On stream queue or disk queue
1097 	_dispatch_fd_debug("enqueue operation", op->fd_entry->fd);
1098 	_dispatch_io_data_retain(data);
1099 	op->data = data;
1100 	int err = _dispatch_io_get_error(op, NULL, true);
1101 	if (err) {
1102 		op->err = err;
1103 		// Final release
1104 		_dispatch_release(op);
1105 		return false;
1106 	}
1107 	if (op->params.interval) {
1108 		dispatch_resume(_dispatch_operation_timer(tq, op));
1109 	}
1110 	return true;
1111 }
1112 
1113 static dispatch_source_t
_dispatch_operation_timer(dispatch_queue_t tq,dispatch_operation_t op)1114 _dispatch_operation_timer(dispatch_queue_t tq, dispatch_operation_t op)
1115 {
1116 	// On stream queue or pick queue
1117 	if (op->timer) {
1118 		return op->timer;
1119 	}
1120 	dispatch_source_t timer = dispatch_source_create(
1121 			DISPATCH_SOURCE_TYPE_TIMER, 0, 0, tq);
1122 	dispatch_source_set_timer(timer, dispatch_time(DISPATCH_TIME_NOW,
1123 			(int64_t)op->params.interval), op->params.interval, 0);
1124 	dispatch_source_set_event_handler(timer, ^{
1125 		// On stream queue or pick queue
1126 		if (dispatch_source_testcancel(timer)) {
1127 			// Do nothing. The operation has already completed
1128 			return;
1129 		}
1130 		dispatch_op_flags_t flags = DOP_DEFAULT;
1131 		if (op->params.interval_flags & DISPATCH_IO_STRICT_INTERVAL) {
1132 			// Deliver even if there is less data than the low-water mark
1133 			flags |= DOP_DELIVER;
1134 		}
1135 		// If the operation is active, dont deliver data
1136 		if ((op->active) && (flags & DOP_DELIVER)) {
1137 			op->flags = flags;
1138 		} else {
1139 			_dispatch_operation_deliver_data(op, flags);
1140 		}
1141 	});
1142 	op->timer = timer;
1143 	return op->timer;
1144 }
1145 
1146 #pragma mark -
1147 #pragma mark dispatch_fd_entry_t
1148 
1149 #if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1150 static void
_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)1151 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)
1152 {
1153 	guardid_t guard = fd_entry;
1154 	const unsigned int guard_flags = GUARD_CLOSE;
1155 	int err, fd_flags = 0;
1156 	_dispatch_io_syscall_switch_noerr(err,
1157 		change_fdguard_np(fd_entry->fd, NULL, 0, &guard, guard_flags,
1158 				&fd_flags),
1159 		case 0:
1160 			fd_entry->guard_flags = guard_flags;
1161 			fd_entry->orig_fd_flags = fd_flags;
1162 			break;
1163 		case EPERM: break;
1164 		default: (void)dispatch_assume_zero(err); break;
1165 	);
1166 }
1167 
1168 static void
_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)1169 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)
1170 {
1171 	if (!fd_entry->guard_flags) {
1172 		return;
1173 	}
1174 	guardid_t guard = fd_entry;
1175 	int err, fd_flags = fd_entry->orig_fd_flags;
1176 	_dispatch_io_syscall_switch(err,
1177 		change_fdguard_np(fd_entry->fd, &guard, fd_entry->guard_flags, NULL, 0,
1178 				&fd_flags),
1179 		default: (void)dispatch_assume_zero(err); break;
1180 	);
1181 }
1182 #else
1183 static inline void
_dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry)1184 _dispatch_fd_entry_guard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1185 static inline void
_dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry)1186 _dispatch_fd_entry_unguard(dispatch_fd_entry_t fd_entry) { (void)fd_entry; }
1187 #endif // DISPATCH_USE_GUARDED_FD
1188 
1189 static inline int
_dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry,const char * path,int oflag,mode_t mode)1190 _dispatch_fd_entry_guarded_open(dispatch_fd_entry_t fd_entry, const char *path,
1191 		int oflag, mode_t mode) {
1192 #if DISPATCH_USE_GUARDED_FD
1193 	guardid_t guard = (uintptr_t)fd_entry;
1194 	const unsigned int guard_flags = GUARD_CLOSE | GUARD_DUP |
1195 			GUARD_SOCKET_IPC | GUARD_FILEPORT;
1196 	int fd = guarded_open_np(path, &guard, guard_flags, oflag | O_CLOEXEC,
1197 			mode);
1198 	if (fd != -1) {
1199 		fd_entry->guard_flags = guard_flags;
1200 		return fd;
1201 	}
1202 	errno = 0;
1203 #endif
1204 	return open(path, oflag, mode);
1205 	(void)fd_entry;
1206 }
1207 
1208 static inline int
_dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry,int fd)1209 _dispatch_fd_entry_guarded_close(dispatch_fd_entry_t fd_entry, int fd) {
1210 #if DISPATCH_USE_GUARDED_FD
1211 	if (fd_entry->guard_flags) {
1212 		guardid_t guard = (uintptr_t)fd_entry;
1213 		return guarded_close_np(fd, &guard);
1214 	} else
1215 #endif
1216 	{
1217 		return close(fd);
1218 	}
1219 	(void)fd_entry;
1220 }
1221 
1222 static inline void
_dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry)1223 _dispatch_fd_entry_retain(dispatch_fd_entry_t fd_entry) {
1224 	dispatch_suspend(fd_entry->close_queue);
1225 }
1226 
1227 static inline void
_dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry)1228 _dispatch_fd_entry_release(dispatch_fd_entry_t fd_entry) {
1229 	dispatch_resume(fd_entry->close_queue);
1230 }
1231 
1232 static void
_dispatch_fd_entry_init_async(dispatch_fd_t fd,dispatch_fd_entry_init_callback_t completion_callback)1233 _dispatch_fd_entry_init_async(dispatch_fd_t fd,
1234 		dispatch_fd_entry_init_callback_t completion_callback)
1235 {
1236 	static dispatch_once_t _dispatch_io_fds_lockq_pred;
1237 	dispatch_once_f(&_dispatch_io_fds_lockq_pred, NULL,
1238 			_dispatch_io_fds_lockq_init);
1239 	dispatch_async(_dispatch_io_fds_lockq, ^{
1240 		_dispatch_fd_debug("fd entry init", fd);
1241 		dispatch_fd_entry_t fd_entry = NULL;
1242 		// Check to see if there is an existing entry for the given fd
1243 		uintptr_t hash = DIO_HASH(fd);
1244 		TAILQ_FOREACH(fd_entry, &_dispatch_io_fds[hash], fd_list) {
1245 			if (fd_entry->fd == fd) {
1246 				// Retain the fd_entry to ensure it cannot go away until the
1247 				// stat() has completed
1248 				_dispatch_fd_entry_retain(fd_entry);
1249 				break;
1250 			}
1251 		}
1252 		if (!fd_entry) {
1253 			// If we did not find an existing entry, create one
1254 			fd_entry = _dispatch_fd_entry_create_with_fd(fd, hash);
1255 		}
1256 		dispatch_async(fd_entry->barrier_queue, ^{
1257 			_dispatch_fd_debug("fd entry init completion", fd);
1258 			completion_callback(fd_entry);
1259 			// stat() is complete, release reference to fd_entry
1260 			_dispatch_fd_entry_release(fd_entry);
1261 		});
1262 	});
1263 }
1264 
1265 static dispatch_fd_entry_t
_dispatch_fd_entry_create(dispatch_queue_t q)1266 _dispatch_fd_entry_create(dispatch_queue_t q)
1267 {
1268 	dispatch_fd_entry_t fd_entry;
1269 	fd_entry = _dispatch_calloc(1ul, sizeof(struct dispatch_fd_entry_s));
1270 	fd_entry->close_queue = dispatch_queue_create(
1271 			"com.apple.libdispatch-io.closeq", NULL);
1272 	// Use target queue to ensure that no concurrent lookups are going on when
1273 	// the close queue is running
1274 	fd_entry->close_queue->do_targetq = q;
1275 	_dispatch_retain(q);
1276 	// Suspend the cleanup queue until closing
1277 	_dispatch_fd_entry_retain(fd_entry);
1278 	return fd_entry;
1279 }
1280 
1281 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_fd(dispatch_fd_t fd,uintptr_t hash)1282 _dispatch_fd_entry_create_with_fd(dispatch_fd_t fd, uintptr_t hash)
1283 {
1284 	// On fds lock queue
1285 	_dispatch_fd_debug("fd entry create", fd);
1286 	dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1287 			_dispatch_io_fds_lockq);
1288 	fd_entry->fd = fd;
1289 	TAILQ_INSERT_TAIL(&_dispatch_io_fds[hash], fd_entry, fd_list);
1290 	fd_entry->barrier_queue = dispatch_queue_create(
1291 			"com.apple.libdispatch-io.barrierq", NULL);
1292 	fd_entry->barrier_group = dispatch_group_create();
1293 	dispatch_async(fd_entry->barrier_queue, ^{
1294 		_dispatch_fd_debug("fd entry stat", fd);
1295 		int err, orig_flags, orig_nosigpipe = -1;
1296 		struct stat st;
1297 		_dispatch_io_syscall_switch(err,
1298 			fstat(fd, &st),
1299 			default: fd_entry->err = err; return;
1300 		);
1301 		fd_entry->stat.dev = st.st_dev;
1302 		fd_entry->stat.mode = st.st_mode;
1303 		_dispatch_fd_entry_guard(fd_entry);
1304 		_dispatch_io_syscall_switch(err,
1305 			orig_flags = fcntl(fd, F_GETFL),
1306 			default: (void)dispatch_assume_zero(err); break;
1307 		);
1308 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1309 		if (S_ISFIFO(st.st_mode)) {
1310 			_dispatch_io_syscall_switch(err,
1311 				orig_nosigpipe = fcntl(fd, F_GETNOSIGPIPE),
1312 				default: (void)dispatch_assume_zero(err); break;
1313 			);
1314 			if (orig_nosigpipe != -1) {
1315 				_dispatch_io_syscall_switch(err,
1316 					orig_nosigpipe = fcntl(fd, F_SETNOSIGPIPE, 1),
1317 					default:
1318 						orig_nosigpipe = -1;
1319 						(void)dispatch_assume_zero(err);
1320 						break;
1321 				);
1322 			}
1323 		}
1324 #endif
1325 		if (S_ISREG(st.st_mode)) {
1326 			if (orig_flags != -1) {
1327 				_dispatch_io_syscall_switch(err,
1328 					fcntl(fd, F_SETFL, orig_flags & ~O_NONBLOCK),
1329 					default:
1330 						orig_flags = -1;
1331 						(void)dispatch_assume_zero(err);
1332 						break;
1333 				);
1334 			}
1335 			int32_t dev = major(st.st_dev);
1336 			// We have to get the disk on the global dev queue. The
1337 			// barrier queue cannot continue until that is complete
1338 			dispatch_suspend(fd_entry->barrier_queue);
1339 			dispatch_once_f(&_dispatch_io_devs_lockq_pred, NULL,
1340 					_dispatch_io_devs_lockq_init);
1341 			dispatch_async(_dispatch_io_devs_lockq, ^{
1342 				_dispatch_disk_init(fd_entry, dev);
1343 				dispatch_resume(fd_entry->barrier_queue);
1344 			});
1345 		} else {
1346 			if (orig_flags != -1) {
1347 				_dispatch_io_syscall_switch(err,
1348 					fcntl(fd, F_SETFL, orig_flags | O_NONBLOCK),
1349 					default:
1350 						orig_flags = -1;
1351 						(void)dispatch_assume_zero(err);
1352 						break;
1353 				);
1354 			}
1355 			_dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1356 					_DISPATCH_QOS_CLASS_DEFAULT, false));
1357 		}
1358 		fd_entry->orig_flags = orig_flags;
1359 		fd_entry->orig_nosigpipe = orig_nosigpipe;
1360 	});
1361 	// This is the first item run when the close queue is resumed, indicating
1362 	// that all channels associated with this entry have been closed and that
1363 	// all operations associated with this entry have been freed
1364 	dispatch_async(fd_entry->close_queue, ^{
1365 		if (!fd_entry->disk) {
1366 			_dispatch_fd_debug("close queue fd_entry cleanup", fd);
1367 			dispatch_op_direction_t dir;
1368 			for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1369 				_dispatch_stream_dispose(fd_entry, dir);
1370 			}
1371 		} else {
1372 			dispatch_disk_t disk = fd_entry->disk;
1373 			dispatch_async(_dispatch_io_devs_lockq, ^{
1374 				_dispatch_release(disk);
1375 			});
1376 		}
1377 		// Remove this entry from the global fd list
1378 		TAILQ_REMOVE(&_dispatch_io_fds[hash], fd_entry, fd_list);
1379 	});
1380 	// If there was a source associated with this stream, disposing of the
1381 	// source cancels it and suspends the close queue. Freeing the fd_entry
1382 	// structure must happen after the source cancel handler has finished
1383 	dispatch_async(fd_entry->close_queue, ^{
1384 		_dispatch_fd_debug("close queue release", fd);
1385 		dispatch_release(fd_entry->close_queue);
1386 		_dispatch_fd_debug("barrier queue release", fd);
1387 		dispatch_release(fd_entry->barrier_queue);
1388 		_dispatch_fd_debug("barrier group release", fd);
1389 		dispatch_release(fd_entry->barrier_group);
1390 		if (fd_entry->orig_flags != -1) {
1391 			_dispatch_io_syscall(
1392 				fcntl(fd, F_SETFL, fd_entry->orig_flags)
1393 			);
1394 		}
1395 #if DISPATCH_USE_SETNOSIGPIPE // rdar://problem/4121123
1396 		if (fd_entry->orig_nosigpipe != -1) {
1397 			_dispatch_io_syscall(
1398 				fcntl(fd, F_SETNOSIGPIPE, fd_entry->orig_nosigpipe)
1399 			);
1400 		}
1401 #endif
1402 		_dispatch_fd_entry_unguard(fd_entry);
1403 		if (fd_entry->convenience_channel) {
1404 			fd_entry->convenience_channel->fd_entry = NULL;
1405 			dispatch_release(fd_entry->convenience_channel);
1406 		}
1407 		free(fd_entry);
1408 	});
1409 	return fd_entry;
1410 }
1411 
1412 static dispatch_fd_entry_t
_dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,dev_t dev,mode_t mode)1413 _dispatch_fd_entry_create_with_path(dispatch_io_path_data_t path_data,
1414 		dev_t dev, mode_t mode)
1415 {
1416 	// On devs lock queue
1417 	_dispatch_fd_debug("fd entry create with path %s", -1, path_data->path);
1418 	dispatch_fd_entry_t fd_entry = _dispatch_fd_entry_create(
1419 			path_data->channel->queue);
1420 	if (S_ISREG(mode)) {
1421 		_dispatch_disk_init(fd_entry, major(dev));
1422 	} else {
1423 		_dispatch_stream_init(fd_entry, _dispatch_get_root_queue(
1424 				_DISPATCH_QOS_CLASS_DEFAULT, false));
1425 	}
1426 	fd_entry->fd = -1;
1427 	fd_entry->orig_flags = -1;
1428 	fd_entry->path_data = path_data;
1429 	fd_entry->stat.dev = dev;
1430 	fd_entry->stat.mode = mode;
1431 	fd_entry->barrier_queue = dispatch_queue_create(
1432 			"com.apple.libdispatch-io.barrierq", NULL);
1433 	fd_entry->barrier_group = dispatch_group_create();
1434 	// This is the first item run when the close queue is resumed, indicating
1435 	// that the channel associated with this entry has been closed and that
1436 	// all operations associated with this entry have been freed
1437 	dispatch_async(fd_entry->close_queue, ^{
1438 		_dispatch_fd_debug("close queue fd_entry cleanup", -1);
1439 		if (!fd_entry->disk) {
1440 			dispatch_op_direction_t dir;
1441 			for (dir = 0; dir < DOP_DIR_MAX; dir++) {
1442 				_dispatch_stream_dispose(fd_entry, dir);
1443 			}
1444 		}
1445 		if (fd_entry->fd != -1) {
1446 			_dispatch_fd_entry_guarded_close(fd_entry, fd_entry->fd);
1447 		}
1448 		if (fd_entry->path_data->channel) {
1449 			// If associated channel has not been released yet, mark it as
1450 			// no longer having an fd_entry (for stop after close).
1451 			// It is safe to modify channel since we are on close_queue with
1452 			// target queue the channel queue
1453 			fd_entry->path_data->channel->fd_entry = NULL;
1454 		}
1455 	});
1456 	dispatch_async(fd_entry->close_queue, ^{
1457 		_dispatch_fd_debug("close queue release", -1);
1458 		dispatch_release(fd_entry->close_queue);
1459 		dispatch_release(fd_entry->barrier_queue);
1460 		dispatch_release(fd_entry->barrier_group);
1461 		free(fd_entry->path_data);
1462 		free(fd_entry);
1463 	});
1464 	return fd_entry;
1465 }
1466 
1467 static int
_dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry,dispatch_io_t channel)1468 _dispatch_fd_entry_open(dispatch_fd_entry_t fd_entry, dispatch_io_t channel)
1469 {
1470 	if (!(fd_entry->fd == -1 && fd_entry->path_data)) {
1471 		return 0;
1472 	}
1473 	if (fd_entry->err) {
1474 		return fd_entry->err;
1475 	}
1476 	int fd = -1;
1477 	int oflag = fd_entry->disk ? fd_entry->path_data->oflag & ~O_NONBLOCK :
1478 			fd_entry->path_data->oflag | O_NONBLOCK;
1479 open:
1480 	fd = _dispatch_fd_entry_guarded_open(fd_entry, fd_entry->path_data->path,
1481 			oflag, fd_entry->path_data->mode);
1482 	if (fd == -1) {
1483 		int err = errno;
1484 		if (err == EINTR) {
1485 			goto open;
1486 		}
1487 		(void)dispatch_atomic_cmpxchg2o(fd_entry, err, 0, err, relaxed);
1488 		return err;
1489 	}
1490 	if (!dispatch_atomic_cmpxchg2o(fd_entry, fd, -1, fd, relaxed)) {
1491 		// Lost the race with another open
1492 		_dispatch_fd_entry_guarded_close(fd_entry, fd);
1493 	} else {
1494 		channel->fd_actual = fd;
1495 	}
1496 	_dispatch_object_debug(channel, "%s", __func__);
1497 	return 0;
1498 }
1499 
1500 static void
_dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,dispatch_io_t channel)1501 _dispatch_fd_entry_cleanup_operations(dispatch_fd_entry_t fd_entry,
1502 		dispatch_io_t channel)
1503 {
1504 	if (fd_entry->disk) {
1505 		if (channel) {
1506 			_dispatch_retain(channel);
1507 		}
1508 		_dispatch_fd_entry_retain(fd_entry);
1509 		dispatch_async(fd_entry->disk->pick_queue, ^{
1510 			_dispatch_disk_cleanup_operations(fd_entry->disk, channel);
1511 			_dispatch_fd_entry_release(fd_entry);
1512 			if (channel) {
1513 				_dispatch_release(channel);
1514 			}
1515 		});
1516 	} else {
1517 		dispatch_op_direction_t direction;
1518 		for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1519 			dispatch_stream_t stream = fd_entry->streams[direction];
1520 			if (!stream) {
1521 				continue;
1522 			}
1523 			if (channel) {
1524 				_dispatch_retain(channel);
1525 			}
1526 			_dispatch_fd_entry_retain(fd_entry);
1527 			dispatch_async(stream->dq, ^{
1528 				_dispatch_stream_cleanup_operations(stream, channel);
1529 				_dispatch_fd_entry_release(fd_entry);
1530 				if (channel) {
1531 					_dispatch_release(channel);
1532 				}
1533 			});
1534 		}
1535 	}
1536 }
1537 
1538 #pragma mark -
1539 #pragma mark dispatch_stream_t/dispatch_disk_t
1540 
1541 static void
_dispatch_stream_init(dispatch_fd_entry_t fd_entry,dispatch_queue_t tq)1542 _dispatch_stream_init(dispatch_fd_entry_t fd_entry, dispatch_queue_t tq)
1543 {
1544 	dispatch_op_direction_t direction;
1545 	for (direction = 0; direction < DOP_DIR_MAX; direction++) {
1546 		dispatch_stream_t stream;
1547 		stream = _dispatch_calloc(1ul, sizeof(struct dispatch_stream_s));
1548 		stream->dq = dispatch_queue_create("com.apple.libdispatch-io.streamq",
1549 				NULL);
1550 		dispatch_set_context(stream->dq, stream);
1551 		_dispatch_retain(tq);
1552 		stream->dq->do_targetq = tq;
1553 		TAILQ_INIT(&stream->operations[DISPATCH_IO_RANDOM]);
1554 		TAILQ_INIT(&stream->operations[DISPATCH_IO_STREAM]);
1555 		fd_entry->streams[direction] = stream;
1556 	}
1557 }
1558 
1559 static void
_dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,dispatch_op_direction_t direction)1560 _dispatch_stream_dispose(dispatch_fd_entry_t fd_entry,
1561 		dispatch_op_direction_t direction)
1562 {
1563 	// On close queue
1564 	dispatch_stream_t stream = fd_entry->streams[direction];
1565 	if (!stream) {
1566 		return;
1567 	}
1568 	dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1569 	dispatch_assert(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM]));
1570 	if (stream->source) {
1571 		// Balanced by source cancel handler:
1572 		_dispatch_fd_entry_retain(fd_entry);
1573 		dispatch_source_cancel(stream->source);
1574 		dispatch_resume(stream->source);
1575 		dispatch_release(stream->source);
1576 	}
1577 	dispatch_set_context(stream->dq, NULL);
1578 	dispatch_release(stream->dq);
1579 	free(stream);
1580 }
1581 
1582 static void
_dispatch_disk_init(dispatch_fd_entry_t fd_entry,dev_t dev)1583 _dispatch_disk_init(dispatch_fd_entry_t fd_entry, dev_t dev)
1584 {
1585 	// On devs lock queue
1586 	dispatch_disk_t disk;
1587 	// Check to see if there is an existing entry for the given device
1588 	uintptr_t hash = DIO_HASH(dev);
1589 	TAILQ_FOREACH(disk, &_dispatch_io_devs[hash], disk_list) {
1590 		if (disk->dev == dev) {
1591 			_dispatch_retain(disk);
1592 			goto out;
1593 		}
1594 	}
1595 	// Otherwise create a new entry
1596 	size_t pending_reqs_depth = dispatch_io_defaults.max_pending_io_reqs;
1597 	disk = _dispatch_alloc(DISPATCH_VTABLE(disk),
1598 			sizeof(struct dispatch_disk_s) +
1599 			(pending_reqs_depth * sizeof(dispatch_operation_t)));
1600 	disk->do_next = DISPATCH_OBJECT_LISTLESS;
1601 	disk->do_xref_cnt = -1;
1602 	disk->advise_list_depth = pending_reqs_depth;
1603 	disk->do_targetq = _dispatch_get_root_queue(_DISPATCH_QOS_CLASS_DEFAULT,
1604 			false);
1605 	disk->dev = dev;
1606 	TAILQ_INIT(&disk->operations);
1607 	disk->cur_rq = TAILQ_FIRST(&disk->operations);
1608 	char label[45];
1609 	snprintf(label, sizeof(label), "com.apple.libdispatch-io.deviceq.%d", dev);
1610 	disk->pick_queue = dispatch_queue_create(label, NULL);
1611 	TAILQ_INSERT_TAIL(&_dispatch_io_devs[hash], disk, disk_list);
1612 out:
1613 	fd_entry->disk = disk;
1614 	TAILQ_INIT(&fd_entry->stream_ops);
1615 }
1616 
1617 void
_dispatch_disk_dispose(dispatch_disk_t disk)1618 _dispatch_disk_dispose(dispatch_disk_t disk)
1619 {
1620 	uintptr_t hash = DIO_HASH(disk->dev);
1621 	TAILQ_REMOVE(&_dispatch_io_devs[hash], disk, disk_list);
1622 	dispatch_assert(TAILQ_EMPTY(&disk->operations));
1623 	size_t i;
1624 	for (i=0; i<disk->advise_list_depth; ++i) {
1625 		dispatch_assert(!disk->advise_list[i]);
1626 	}
1627 	dispatch_release(disk->pick_queue);
1628 }
1629 
1630 #pragma mark -
1631 #pragma mark dispatch_stream_operations/dispatch_disk_operations
1632 
1633 static inline bool
_dispatch_stream_operation_avail(dispatch_stream_t stream)1634 _dispatch_stream_operation_avail(dispatch_stream_t stream)
1635 {
1636 	return  !(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) ||
1637 			!(TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM]));
1638 }
1639 
1640 static void
_dispatch_stream_enqueue_operation(dispatch_stream_t stream,dispatch_operation_t op,dispatch_data_t data)1641 _dispatch_stream_enqueue_operation(dispatch_stream_t stream,
1642 		dispatch_operation_t op, dispatch_data_t data)
1643 {
1644 	if (!_dispatch_operation_should_enqueue(op, stream->dq, data)) {
1645 		return;
1646 	}
1647 	_dispatch_object_debug(op, "%s", __func__);
1648 	bool no_ops = !_dispatch_stream_operation_avail(stream);
1649 	TAILQ_INSERT_TAIL(&stream->operations[op->params.type], op, operation_list);
1650 	if (no_ops) {
1651 		dispatch_async_f(stream->dq, stream->dq,
1652 				_dispatch_stream_queue_handler);
1653 	}
1654 }
1655 
1656 static void
_dispatch_disk_enqueue_operation(dispatch_disk_t disk,dispatch_operation_t op,dispatch_data_t data)1657 _dispatch_disk_enqueue_operation(dispatch_disk_t disk, dispatch_operation_t op,
1658 		dispatch_data_t data)
1659 {
1660 	if (!_dispatch_operation_should_enqueue(op, disk->pick_queue, data)) {
1661 		return;
1662 	}
1663 	_dispatch_object_debug(op, "%s", __func__);
1664 	if (op->params.type == DISPATCH_IO_STREAM) {
1665 		if (TAILQ_EMPTY(&op->fd_entry->stream_ops)) {
1666 			TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1667 		}
1668 		TAILQ_INSERT_TAIL(&op->fd_entry->stream_ops, op, stream_list);
1669 	} else {
1670 		TAILQ_INSERT_TAIL(&disk->operations, op, operation_list);
1671 	}
1672 	_dispatch_disk_handler(disk);
1673 }
1674 
1675 static void
_dispatch_stream_complete_operation(dispatch_stream_t stream,dispatch_operation_t op)1676 _dispatch_stream_complete_operation(dispatch_stream_t stream,
1677 		dispatch_operation_t op)
1678 {
1679 	// On stream queue
1680 	_dispatch_object_debug(op, "%s", __func__);
1681 	_dispatch_fd_debug("complete operation", op->fd_entry->fd);
1682 	TAILQ_REMOVE(&stream->operations[op->params.type], op, operation_list);
1683 	if (op == stream->op) {
1684 		stream->op = NULL;
1685 	}
1686 	if (op->timer) {
1687 		dispatch_source_cancel(op->timer);
1688 	}
1689 	// Final release will deliver any pending data
1690 	_dispatch_release(op);
1691 }
1692 
1693 static void
_dispatch_disk_complete_operation(dispatch_disk_t disk,dispatch_operation_t op)1694 _dispatch_disk_complete_operation(dispatch_disk_t disk, dispatch_operation_t op)
1695 {
1696 	// On pick queue
1697 	_dispatch_object_debug(op, "%s", __func__);
1698 	_dispatch_fd_debug("complete operation", op->fd_entry->fd);
1699 	// Current request is always the last op returned
1700 	if (disk->cur_rq == op) {
1701 		disk->cur_rq = TAILQ_PREV(op, dispatch_disk_operations_s,
1702 				operation_list);
1703 	}
1704 	if (op->params.type == DISPATCH_IO_STREAM) {
1705 		// Check if there are other pending stream operations behind it
1706 		dispatch_operation_t op_next = TAILQ_NEXT(op, stream_list);
1707 		TAILQ_REMOVE(&op->fd_entry->stream_ops, op, stream_list);
1708 		if (op_next) {
1709 			TAILQ_INSERT_TAIL(&disk->operations, op_next, operation_list);
1710 		}
1711 	}
1712 	TAILQ_REMOVE(&disk->operations, op, operation_list);
1713 	if (op->timer) {
1714 		dispatch_source_cancel(op->timer);
1715 	}
1716 	// Final release will deliver any pending data
1717 	_dispatch_release(op);
1718 }
1719 
1720 static dispatch_operation_t
_dispatch_stream_pick_next_operation(dispatch_stream_t stream,dispatch_operation_t op)1721 _dispatch_stream_pick_next_operation(dispatch_stream_t stream,
1722 		dispatch_operation_t op)
1723 {
1724 	// On stream queue
1725 	if (!op) {
1726 		// On the first run through, pick the first operation
1727 		if (!_dispatch_stream_operation_avail(stream)) {
1728 			return op;
1729 		}
1730 		if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_STREAM])) {
1731 			op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_STREAM]);
1732 		} else if (!TAILQ_EMPTY(&stream->operations[DISPATCH_IO_RANDOM])) {
1733 			op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1734 		}
1735 		return op;
1736 	}
1737 	if (op->params.type == DISPATCH_IO_STREAM) {
1738 		// Stream operations need to be serialized so continue the current
1739 		// operation until it is finished
1740 		return op;
1741 	}
1742 	// Get the next random operation (round-robin)
1743 	if (op->params.type == DISPATCH_IO_RANDOM) {
1744 		op = TAILQ_NEXT(op, operation_list);
1745 		if (!op) {
1746 			op = TAILQ_FIRST(&stream->operations[DISPATCH_IO_RANDOM]);
1747 		}
1748 		return op;
1749 	}
1750 	return NULL;
1751 }
1752 
1753 static dispatch_operation_t
_dispatch_disk_pick_next_operation(dispatch_disk_t disk)1754 _dispatch_disk_pick_next_operation(dispatch_disk_t disk)
1755 {
1756 	// On pick queue
1757 	dispatch_operation_t op;
1758 	if (!TAILQ_EMPTY(&disk->operations)) {
1759 		if (disk->cur_rq == NULL) {
1760 			op = TAILQ_FIRST(&disk->operations);
1761 		} else {
1762 			op = disk->cur_rq;
1763 			do {
1764 				op = TAILQ_NEXT(op, operation_list);
1765 				if (!op) {
1766 					op = TAILQ_FIRST(&disk->operations);
1767 				}
1768 				// TODO: more involved picking algorithm rdar://problem/8780312
1769 			} while (op->active && op != disk->cur_rq);
1770 		}
1771 		if (!op->active) {
1772 			disk->cur_rq = op;
1773 			return op;
1774 		}
1775 	}
1776 	return NULL;
1777 }
1778 
1779 static void
_dispatch_stream_cleanup_operations(dispatch_stream_t stream,dispatch_io_t channel)1780 _dispatch_stream_cleanup_operations(dispatch_stream_t stream,
1781 		dispatch_io_t channel)
1782 {
1783 	// On stream queue
1784 	dispatch_operation_t op, tmp;
1785 	typeof(*stream->operations) *operations;
1786 	operations = &stream->operations[DISPATCH_IO_RANDOM];
1787 	TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1788 		if (!channel || op->channel == channel) {
1789 			_dispatch_stream_complete_operation(stream, op);
1790 		}
1791 	}
1792 	operations = &stream->operations[DISPATCH_IO_STREAM];
1793 	TAILQ_FOREACH_SAFE(op, operations, operation_list, tmp) {
1794 		if (!channel || op->channel == channel) {
1795 			_dispatch_stream_complete_operation(stream, op);
1796 		}
1797 	}
1798 	if (stream->source_running && !_dispatch_stream_operation_avail(stream)) {
1799 		dispatch_suspend(stream->source);
1800 		stream->source_running = false;
1801 	}
1802 }
1803 
1804 static void
_dispatch_disk_cleanup_operations(dispatch_disk_t disk,dispatch_io_t channel)1805 _dispatch_disk_cleanup_operations(dispatch_disk_t disk, dispatch_io_t channel)
1806 {
1807 	// On pick queue
1808 	dispatch_operation_t op, tmp;
1809 	TAILQ_FOREACH_SAFE(op, &disk->operations, operation_list, tmp) {
1810 		if (!channel || op->channel == channel) {
1811 			_dispatch_disk_complete_operation(disk, op);
1812 		}
1813 	}
1814 }
1815 
1816 #pragma mark -
1817 #pragma mark dispatch_stream_handler/dispatch_disk_handler
1818 
1819 static dispatch_source_t
_dispatch_stream_source(dispatch_stream_t stream,dispatch_operation_t op)1820 _dispatch_stream_source(dispatch_stream_t stream, dispatch_operation_t op)
1821 {
1822 	// On stream queue
1823 	if (stream->source) {
1824 		return stream->source;
1825 	}
1826 	dispatch_fd_t fd = op->fd_entry->fd;
1827 	_dispatch_fd_debug("stream source create", fd);
1828 	dispatch_source_t source = NULL;
1829 	if (op->direction == DOP_DIR_READ) {
1830 		source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ,
1831 				(uintptr_t)fd, 0, stream->dq);
1832 	} else if (op->direction == DOP_DIR_WRITE) {
1833 		source = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE,
1834 				(uintptr_t)fd, 0, stream->dq);
1835 	} else {
1836 		dispatch_assert(op->direction < DOP_DIR_MAX);
1837 		return NULL;
1838 	}
1839 	dispatch_set_context(source, stream);
1840 	dispatch_source_set_event_handler_f(source,
1841 			_dispatch_stream_source_handler);
1842 	// Close queue must not run user cleanup handlers until sources are fully
1843 	// unregistered
1844 	dispatch_queue_t close_queue = op->fd_entry->close_queue;
1845 	dispatch_source_set_cancel_handler(source, ^{
1846 		_dispatch_fd_debug("stream source cancel", fd);
1847 		dispatch_resume(close_queue);
1848 	});
1849 	stream->source = source;
1850 	return stream->source;
1851 }
1852 
1853 static void
_dispatch_stream_source_handler(void * ctx)1854 _dispatch_stream_source_handler(void *ctx)
1855 {
1856 	// On stream queue
1857 	dispatch_stream_t stream = (dispatch_stream_t)ctx;
1858 	dispatch_suspend(stream->source);
1859 	stream->source_running = false;
1860 	return _dispatch_stream_handler(stream);
1861 }
1862 
1863 static void
_dispatch_stream_queue_handler(void * ctx)1864 _dispatch_stream_queue_handler(void *ctx)
1865 {
1866 	// On stream queue
1867 	dispatch_stream_t stream = (dispatch_stream_t)dispatch_get_context(ctx);
1868 	if (!stream) {
1869 		// _dispatch_stream_dispose has been called
1870 		return;
1871 	}
1872 	return _dispatch_stream_handler(stream);
1873 }
1874 
1875 static void
_dispatch_stream_handler(void * ctx)1876 _dispatch_stream_handler(void *ctx)
1877 {
1878 	// On stream queue
1879 	dispatch_stream_t stream = (dispatch_stream_t)ctx;
1880 	dispatch_operation_t op;
1881 pick:
1882 	op = _dispatch_stream_pick_next_operation(stream, stream->op);
1883 	if (!op) {
1884 		_dispatch_debug("no operation found: stream %p", stream);
1885 		return;
1886 	}
1887 	int err = _dispatch_io_get_error(op, NULL, true);
1888 	if (err) {
1889 		op->err = err;
1890 		_dispatch_stream_complete_operation(stream, op);
1891 		goto pick;
1892 	}
1893 	stream->op = op;
1894 	_dispatch_fd_debug("stream handler", op->fd_entry->fd);
1895 	dispatch_fd_entry_t fd_entry = op->fd_entry;
1896 	_dispatch_fd_entry_retain(fd_entry);
1897 	// For performance analysis
1898 	if (!op->total && dispatch_io_defaults.initial_delivery) {
1899 		// Empty delivery to signal the start of the operation
1900 		_dispatch_fd_debug("initial delivery", op->fd_entry->fd);
1901 		_dispatch_operation_deliver_data(op, DOP_DELIVER);
1902 	}
1903 	// TODO: perform on the operation target queue to get correct priority
1904 	int result = _dispatch_operation_perform(op);
1905 	dispatch_op_flags_t flags = ~0u;
1906 	switch (result) {
1907 	case DISPATCH_OP_DELIVER:
1908 		flags = DOP_DEFAULT;
1909 		// Fall through
1910 	case DISPATCH_OP_DELIVER_AND_COMPLETE:
1911 		flags = (flags != DOP_DEFAULT) ? DOP_DELIVER | DOP_NO_EMPTY :
1912 				DOP_DEFAULT;
1913 		_dispatch_operation_deliver_data(op, flags);
1914 		// Fall through
1915 	case DISPATCH_OP_COMPLETE:
1916 		if (flags != DOP_DEFAULT) {
1917 			_dispatch_stream_complete_operation(stream, op);
1918 		}
1919 		if (_dispatch_stream_operation_avail(stream)) {
1920 			dispatch_async_f(stream->dq, stream->dq,
1921 					_dispatch_stream_queue_handler);
1922 		}
1923 		break;
1924 	case DISPATCH_OP_COMPLETE_RESUME:
1925 		_dispatch_stream_complete_operation(stream, op);
1926 		// Fall through
1927 	case DISPATCH_OP_RESUME:
1928 		if (_dispatch_stream_operation_avail(stream)) {
1929 			stream->source_running = true;
1930 			dispatch_resume(_dispatch_stream_source(stream, op));
1931 		}
1932 		break;
1933 	case DISPATCH_OP_ERR:
1934 		_dispatch_stream_cleanup_operations(stream, op->channel);
1935 		break;
1936 	case DISPATCH_OP_FD_ERR:
1937 		_dispatch_fd_entry_retain(fd_entry);
1938 		dispatch_async(fd_entry->barrier_queue, ^{
1939 			_dispatch_fd_entry_cleanup_operations(fd_entry, NULL);
1940 			_dispatch_fd_entry_release(fd_entry);
1941 		});
1942 		break;
1943 	default:
1944 		break;
1945 	}
1946 	_dispatch_fd_entry_release(fd_entry);
1947 	return;
1948 }
1949 
1950 static void
_dispatch_disk_handler(void * ctx)1951 _dispatch_disk_handler(void *ctx)
1952 {
1953 	// On pick queue
1954 	dispatch_disk_t disk = (dispatch_disk_t)ctx;
1955 	if (disk->io_active) {
1956 		return;
1957 	}
1958 	_dispatch_fd_debug("disk handler", -1);
1959 	dispatch_operation_t op;
1960 	size_t i = disk->free_idx, j = disk->req_idx;
1961 	if (j <= i) {
1962 		j += disk->advise_list_depth;
1963 	}
1964 	while (i <= j) {
1965 		if ((!disk->advise_list[i%disk->advise_list_depth]) &&
1966 				(op = _dispatch_disk_pick_next_operation(disk))) {
1967 			int err = _dispatch_io_get_error(op, NULL, true);
1968 			if (err) {
1969 				op->err = err;
1970 				_dispatch_disk_complete_operation(disk, op);
1971 				continue;
1972 			}
1973 			_dispatch_retain(op);
1974 			disk->advise_list[i%disk->advise_list_depth] = op;
1975 			op->active = true;
1976 			_dispatch_object_debug(op, "%s", __func__);
1977 		} else {
1978 			// No more operations to get
1979 			break;
1980 		}
1981 		i++;
1982 	}
1983 	disk->free_idx = (i%disk->advise_list_depth);
1984 	op = disk->advise_list[disk->req_idx];
1985 	if (op) {
1986 		disk->io_active = true;
1987 		dispatch_async_f(op->do_targetq, disk, _dispatch_disk_perform);
1988 	}
1989 }
1990 
1991 static void
_dispatch_disk_perform(void * ctxt)1992 _dispatch_disk_perform(void *ctxt)
1993 {
1994 	dispatch_disk_t disk = ctxt;
1995 	size_t chunk_size = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
1996 	_dispatch_fd_debug("disk perform", -1);
1997 	dispatch_operation_t op;
1998 	size_t i = disk->advise_idx, j = disk->free_idx;
1999 	if (j <= i) {
2000 		j += disk->advise_list_depth;
2001 	}
2002 	do {
2003 		op = disk->advise_list[i%disk->advise_list_depth];
2004 		if (!op) {
2005 			// Nothing more to advise, must be at free_idx
2006 			dispatch_assert(i%disk->advise_list_depth == disk->free_idx);
2007 			break;
2008 		}
2009 		if (op->direction == DOP_DIR_WRITE) {
2010 			// TODO: preallocate writes ? rdar://problem/9032172
2011 			continue;
2012 		}
2013 		if (op->fd_entry->fd == -1 && _dispatch_fd_entry_open(op->fd_entry,
2014 				op->channel)) {
2015 			continue;
2016 		}
2017 		// For performance analysis
2018 		if (!op->total && dispatch_io_defaults.initial_delivery) {
2019 			// Empty delivery to signal the start of the operation
2020 			_dispatch_fd_debug("initial delivery", op->fd_entry->fd);
2021 			_dispatch_operation_deliver_data(op, DOP_DELIVER);
2022 		}
2023 		// Advise two chunks if the list only has one element and this is the
2024 		// first advise on the operation
2025 		if ((j-i) == 1 && !disk->advise_list[disk->free_idx] &&
2026 				!op->advise_offset) {
2027 			chunk_size *= 2;
2028 		}
2029 		_dispatch_operation_advise(op, chunk_size);
2030 	} while (++i < j);
2031 	disk->advise_idx = i%disk->advise_list_depth;
2032 	op = disk->advise_list[disk->req_idx];
2033 	int result = _dispatch_operation_perform(op);
2034 	disk->advise_list[disk->req_idx] = NULL;
2035 	disk->req_idx = (++disk->req_idx)%disk->advise_list_depth;
2036 	dispatch_async(disk->pick_queue, ^{
2037 		switch (result) {
2038 		case DISPATCH_OP_DELIVER:
2039 			_dispatch_operation_deliver_data(op, DOP_DEFAULT);
2040 			break;
2041 		case DISPATCH_OP_COMPLETE:
2042 			_dispatch_disk_complete_operation(disk, op);
2043 			break;
2044 		case DISPATCH_OP_DELIVER_AND_COMPLETE:
2045 			_dispatch_operation_deliver_data(op, DOP_DELIVER | DOP_NO_EMPTY);
2046 			_dispatch_disk_complete_operation(disk, op);
2047 			break;
2048 		case DISPATCH_OP_ERR:
2049 			_dispatch_disk_cleanup_operations(disk, op->channel);
2050 			break;
2051 		case DISPATCH_OP_FD_ERR:
2052 			_dispatch_disk_cleanup_operations(disk, NULL);
2053 			break;
2054 		default:
2055 			dispatch_assert(result);
2056 			break;
2057 		}
2058 		op->active = false;
2059 		disk->io_active = false;
2060 		_dispatch_disk_handler(disk);
2061 		// Balancing the retain in _dispatch_disk_handler. Note that op must be
2062 		// released at the very end, since it might hold the last reference to
2063 		// the disk
2064 		_dispatch_release(op);
2065 	});
2066 }
2067 
2068 #pragma mark -
2069 #pragma mark dispatch_operation_perform
2070 
2071 static void
_dispatch_operation_advise(dispatch_operation_t op,size_t chunk_size)2072 _dispatch_operation_advise(dispatch_operation_t op, size_t chunk_size)
2073 {
2074 	int err;
2075 	struct radvisory advise;
2076 
2077 	// No point in issuing a read advise for the next chunk if we are already
2078 	// a chunk ahead from reading the bytes
2079 	if (op->advise_offset > (off_t)(((size_t)op->offset + op->total) +
2080 			chunk_size + PAGE_SIZE)) {
2081 		return;
2082 	}
2083 	_dispatch_object_debug(op, "%s", __func__);
2084 	advise.ra_count = (int)chunk_size;
2085 	if (!op->advise_offset) {
2086 		op->advise_offset = op->offset;
2087 		// If this is the first time through, align the advised range to a
2088 		// page boundary
2089 		size_t pg_fraction = ((size_t)op->offset + chunk_size) % PAGE_SIZE;
2090 
2091 		advise.ra_count += (int)(pg_fraction ? PAGE_SIZE - pg_fraction : 0);
2092 	}
2093 	advise.ra_offset = op->advise_offset;
2094 	op->advise_offset += advise.ra_count;
2095 	_dispatch_io_syscall_switch(err,
2096 		fcntl(op->fd_entry->fd, F_RDADVISE, &advise),
2097 		case EFBIG: break; // advised past the end of the file rdar://10415691
2098 		case ENOTSUP: break; // not all FS support radvise rdar://13484629
2099 		// TODO: set disk status on error
2100 		default: (void)dispatch_assume_zero(err); break;
2101 	);
2102 }
2103 
2104 static int
_dispatch_operation_perform(dispatch_operation_t op)2105 _dispatch_operation_perform(dispatch_operation_t op)
2106 {
2107 	int err = _dispatch_io_get_error(op, NULL, true);
2108 	if (err) {
2109 		goto error;
2110 	}
2111 	_dispatch_object_debug(op, "%s", __func__);
2112 	if (!op->buf) {
2113 		size_t max_buf_siz = op->params.high;
2114 		size_t chunk_siz = dispatch_io_defaults.chunk_pages * PAGE_SIZE;
2115 		if (op->direction == DOP_DIR_READ) {
2116 			// If necessary, create a buffer for the ongoing operation, large
2117 			// enough to fit chunk_pages but at most high-water
2118 			size_t data_siz = dispatch_data_get_size(op->data);
2119 			if (data_siz) {
2120 				dispatch_assert(data_siz < max_buf_siz);
2121 				max_buf_siz -= data_siz;
2122 			}
2123 			if (max_buf_siz > chunk_siz) {
2124 				max_buf_siz = chunk_siz;
2125 			}
2126 			if (op->length < SIZE_MAX) {
2127 				op->buf_siz = op->length - op->total;
2128 				if (op->buf_siz > max_buf_siz) {
2129 					op->buf_siz = max_buf_siz;
2130 				}
2131 			} else {
2132 				op->buf_siz = max_buf_siz;
2133 			}
2134 			op->buf = valloc(op->buf_siz);
2135 			_dispatch_fd_debug("buffer allocated", op->fd_entry->fd);
2136 		} else if (op->direction == DOP_DIR_WRITE) {
2137 			// Always write the first data piece, if that is smaller than a
2138 			// chunk, accumulate further data pieces until chunk size is reached
2139 			if (chunk_siz > max_buf_siz) {
2140 				chunk_siz = max_buf_siz;
2141 			}
2142 			op->buf_siz = 0;
2143 			dispatch_data_apply(op->data,
2144 					^(dispatch_data_t region DISPATCH_UNUSED,
2145 					size_t offset DISPATCH_UNUSED,
2146 					const void* buf DISPATCH_UNUSED, size_t len) {
2147 				size_t siz = op->buf_siz + len;
2148 				if (!op->buf_siz || siz <= chunk_siz) {
2149 					op->buf_siz = siz;
2150 				}
2151 				return (bool)(siz < chunk_siz);
2152 			});
2153 			if (op->buf_siz > max_buf_siz) {
2154 				op->buf_siz = max_buf_siz;
2155 			}
2156 			dispatch_data_t d;
2157 			d = dispatch_data_create_subrange(op->data, 0, op->buf_siz);
2158 			op->buf_data = dispatch_data_create_map(d, (const void**)&op->buf,
2159 					NULL);
2160 			_dispatch_io_data_release(d);
2161 			_dispatch_fd_debug("buffer mapped", op->fd_entry->fd);
2162 		}
2163 	}
2164 	if (op->fd_entry->fd == -1) {
2165 		err = _dispatch_fd_entry_open(op->fd_entry, op->channel);
2166 		if (err) {
2167 			goto error;
2168 		}
2169 	}
2170 	void *buf = (uint8_t *)op->buf + op->buf_len;
2171 	size_t len = op->buf_siz - op->buf_len;
2172 	off_t off = (off_t)((size_t)op->offset + op->total);
2173 	ssize_t processed = -1;
2174 syscall:
2175 	if (op->direction == DOP_DIR_READ) {
2176 		if (op->params.type == DISPATCH_IO_STREAM) {
2177 			processed = read(op->fd_entry->fd, buf, len);
2178 		} else if (op->params.type == DISPATCH_IO_RANDOM) {
2179 			processed = pread(op->fd_entry->fd, buf, len, off);
2180 		}
2181 	} else if (op->direction == DOP_DIR_WRITE) {
2182 		if (op->params.type == DISPATCH_IO_STREAM) {
2183 			processed = write(op->fd_entry->fd, buf, len);
2184 		} else if (op->params.type == DISPATCH_IO_RANDOM) {
2185 			processed = pwrite(op->fd_entry->fd, buf, len, off);
2186 		}
2187 	}
2188 	// Encountered an error on the file descriptor
2189 	if (processed == -1) {
2190 		err = errno;
2191 		if (err == EINTR) {
2192 			goto syscall;
2193 		}
2194 		goto error;
2195 	}
2196 	// EOF is indicated by two handler invocations
2197 	if (processed == 0) {
2198 		_dispatch_fd_debug("EOF", op->fd_entry->fd);
2199 		return DISPATCH_OP_DELIVER_AND_COMPLETE;
2200 	}
2201 	op->buf_len += (size_t)processed;
2202 	op->total += (size_t)processed;
2203 	if (op->total == op->length) {
2204 		// Finished processing all the bytes requested by the operation
2205 		return DISPATCH_OP_COMPLETE;
2206 	} else {
2207 		// Deliver data only if we satisfy the filters
2208 		return DISPATCH_OP_DELIVER;
2209 	}
2210 error:
2211 	if (err == EAGAIN) {
2212 		// For disk based files with blocking I/O we should never get EAGAIN
2213 		dispatch_assert(!op->fd_entry->disk);
2214 		_dispatch_fd_debug("EAGAIN %d", op->fd_entry->fd, err);
2215 		if (op->direction == DOP_DIR_READ && op->total &&
2216 				op->channel == op->fd_entry->convenience_channel) {
2217 			// Convenience read with available data completes on EAGAIN
2218 			return DISPATCH_OP_COMPLETE_RESUME;
2219 		}
2220 		return DISPATCH_OP_RESUME;
2221 	}
2222 	op->err = err;
2223 	switch (err) {
2224 	case ECANCELED:
2225 		return DISPATCH_OP_ERR;
2226 	case EBADF:
2227 		(void)dispatch_atomic_cmpxchg2o(op->fd_entry, err, 0, err, relaxed);
2228 		return DISPATCH_OP_FD_ERR;
2229 	default:
2230 		return DISPATCH_OP_COMPLETE;
2231 	}
2232 }
2233 
2234 static void
_dispatch_operation_deliver_data(dispatch_operation_t op,dispatch_op_flags_t flags)2235 _dispatch_operation_deliver_data(dispatch_operation_t op,
2236 		dispatch_op_flags_t flags)
2237 {
2238 	// Either called from stream resp. pick queue or when op is finalized
2239 	dispatch_data_t data = NULL;
2240 	int err = 0;
2241 	size_t undelivered = op->undelivered + op->buf_len;
2242 	bool deliver = (flags & (DOP_DELIVER|DOP_DONE)) ||
2243 			(op->flags & DOP_DELIVER);
2244 	op->flags = DOP_DEFAULT;
2245 	if (!deliver) {
2246 		// Don't deliver data until low water mark has been reached
2247 		if (undelivered >= op->params.low) {
2248 			deliver = true;
2249 		} else if (op->buf_len < op->buf_siz) {
2250 			// Request buffer is not yet used up
2251 			_dispatch_fd_debug("buffer data", op->fd_entry->fd);
2252 			return;
2253 		}
2254 	} else {
2255 		err = op->err;
2256 		if (!err && (op->channel->atomic_flags & DIO_STOPPED)) {
2257 			err = ECANCELED;
2258 			op->err = err;
2259 		}
2260 	}
2261 	// Deliver data or buffer used up
2262 	if (op->direction == DOP_DIR_READ) {
2263 		if (op->buf_len) {
2264 			void *buf = op->buf;
2265 			data = dispatch_data_create(buf, op->buf_len, NULL,
2266 					DISPATCH_DATA_DESTRUCTOR_FREE);
2267 			op->buf = NULL;
2268 			op->buf_len = 0;
2269 			dispatch_data_t d = dispatch_data_create_concat(op->data, data);
2270 			_dispatch_io_data_release(op->data);
2271 			_dispatch_io_data_release(data);
2272 			data = d;
2273 		} else {
2274 			data = op->data;
2275 		}
2276 		op->data = deliver ? dispatch_data_empty : data;
2277 	} else if (op->direction == DOP_DIR_WRITE) {
2278 		if (deliver) {
2279 			data = dispatch_data_create_subrange(op->data, op->buf_len,
2280 					op->length);
2281 		}
2282 		if (op->buf_data && op->buf_len == op->buf_siz) {
2283 			_dispatch_io_data_release(op->buf_data);
2284 			op->buf_data = NULL;
2285 			op->buf = NULL;
2286 			op->buf_len = 0;
2287 			// Trim newly written buffer from head of unwritten data
2288 			dispatch_data_t d;
2289 			if (deliver) {
2290 				_dispatch_io_data_retain(data);
2291 				d = data;
2292 			} else {
2293 				d = dispatch_data_create_subrange(op->data, op->buf_siz,
2294 						op->length);
2295 			}
2296 			_dispatch_io_data_release(op->data);
2297 			op->data = d;
2298 		}
2299 	} else {
2300 		dispatch_assert(op->direction < DOP_DIR_MAX);
2301 		return;
2302 	}
2303 	if (!deliver || ((flags & DOP_NO_EMPTY) && !dispatch_data_get_size(data))) {
2304 		op->undelivered = undelivered;
2305 		_dispatch_fd_debug("buffer data", op->fd_entry->fd);
2306 		return;
2307 	}
2308 	op->undelivered = 0;
2309 	_dispatch_object_debug(op, "%s", __func__);
2310 	_dispatch_fd_debug("deliver data", op->fd_entry->fd);
2311 	dispatch_op_direction_t direction = op->direction;
2312 	dispatch_io_handler_t handler = op->handler;
2313 #if DISPATCH_IO_DEBUG
2314 	int fd = op->fd_entry->fd;
2315 #endif
2316 	dispatch_fd_entry_t fd_entry = op->fd_entry;
2317 	_dispatch_fd_entry_retain(fd_entry);
2318 	dispatch_io_t channel = op->channel;
2319 	_dispatch_retain(channel);
2320 	// Note that data delivery may occur after the operation is freed
2321 	dispatch_async(op->op_q, ^{
2322 		bool done = (flags & DOP_DONE);
2323 		dispatch_data_t d = data;
2324 		if (done) {
2325 			if (direction == DOP_DIR_READ && err) {
2326 				if (dispatch_data_get_size(d)) {
2327 					_dispatch_fd_debug("IO handler invoke", fd);
2328 					handler(false, d, 0);
2329 				}
2330 				d = NULL;
2331 			} else if (direction == DOP_DIR_WRITE && !err) {
2332 				d = NULL;
2333 			}
2334 		}
2335 		_dispatch_fd_debug("IO handler invoke", fd);
2336 		handler(done, d, err);
2337 		_dispatch_release(channel);
2338 		_dispatch_fd_entry_release(fd_entry);
2339 		_dispatch_io_data_release(data);
2340 	});
2341 }
2342 
2343 #pragma mark -
2344 #pragma mark dispatch_io_debug
2345 
2346 static size_t
_dispatch_io_debug_attr(dispatch_io_t channel,char * buf,size_t bufsiz)2347 _dispatch_io_debug_attr(dispatch_io_t channel, char* buf, size_t bufsiz)
2348 {
2349 	dispatch_queue_t target = channel->do_targetq;
2350 	return dsnprintf(buf, bufsiz, "type = %s, fd = 0x%x, %sfd_entry = %p, "
2351 			"queue = %p, target = %s[%p], barrier_queue = %p, barrier_group = "
2352 			"%p, err = 0x%x, low = 0x%zx, high = 0x%zx, interval%s = %zu ",
2353 			channel->params.type == DISPATCH_IO_STREAM ? "stream" : "random",
2354 			channel->fd_actual, channel->atomic_flags & DIO_STOPPED ?
2355 			"stopped, " : channel->atomic_flags & DIO_CLOSED ? "closed, " : "",
2356 			channel->fd_entry, channel->queue, target && target->dq_label ?
2357 			target->dq_label : "", target, channel->barrier_queue,
2358 			channel->barrier_group, channel->err, channel->params.low,
2359 			channel->params.high, channel->params.interval_flags &
2360 			DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "",
2361 			channel->params.interval);
2362 }
2363 
2364 size_t
_dispatch_io_debug(dispatch_io_t channel,char * buf,size_t bufsiz)2365 _dispatch_io_debug(dispatch_io_t channel, char* buf, size_t bufsiz)
2366 {
2367 	size_t offset = 0;
2368 	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2369 			dx_kind(channel), channel);
2370 	offset += _dispatch_object_debug_attr(channel, &buf[offset],
2371 			bufsiz - offset);
2372 	offset += _dispatch_io_debug_attr(channel, &buf[offset], bufsiz - offset);
2373 	offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2374 	return offset;
2375 }
2376 
2377 static size_t
_dispatch_operation_debug_attr(dispatch_operation_t op,char * buf,size_t bufsiz)2378 _dispatch_operation_debug_attr(dispatch_operation_t op, char* buf,
2379 		size_t bufsiz)
2380 {
2381 	dispatch_queue_t target = op->do_targetq;
2382 	dispatch_queue_t oqtarget = op->op_q ? op->op_q->do_targetq : NULL;
2383 	return dsnprintf(buf, bufsiz, "type = %s %s, fd = 0x%x, fd_entry = %p, "
2384 			"channel = %p, queue = %p -> %s[%p], target = %s[%p], "
2385 			"offset = %zd, length = %zu, done = %zu, undelivered = %zu, "
2386 			"flags = %u, err = 0x%x, low = 0x%zx, high = 0x%zx, "
2387 			"interval%s = %zu ", op->params.type == DISPATCH_IO_STREAM ?
2388 			"stream" : "random", op->direction == DOP_DIR_READ ? "read" :
2389 			"write", op->fd_entry ? op->fd_entry->fd : -1, op->fd_entry,
2390 			op->channel, op->op_q, oqtarget && oqtarget->dq_label ?
2391 			oqtarget->dq_label : "", oqtarget, target && target->dq_label ?
2392 			target->dq_label : "", target, op->offset, op->length, op->total,
2393 			op->undelivered + op->buf_len, op->flags, op->err, op->params.low,
2394 			op->params.high, op->params.interval_flags &
2395 			DISPATCH_IO_STRICT_INTERVAL ? "(strict)" : "", op->params.interval);
2396 }
2397 
2398 size_t
_dispatch_operation_debug(dispatch_operation_t op,char * buf,size_t bufsiz)2399 _dispatch_operation_debug(dispatch_operation_t op, char* buf, size_t bufsiz)
2400 {
2401 	size_t offset = 0;
2402 	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
2403 			dx_kind(op), op);
2404 	offset += _dispatch_object_debug_attr(op, &buf[offset], bufsiz - offset);
2405 	offset += _dispatch_operation_debug_attr(op, &buf[offset], bufsiz - offset);
2406 	offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
2407 	return offset;
2408 }
2409