xref: /NextBSD/sys/compat/mach/ipc/ipc_mqueue.c (revision 56cae514be6eab01cc80ae59ab8e50759483a19d)
1 /*
2  * Copyright 1991-1998 by Open Software Foundation, Inc.
3  *              All Rights Reserved
4  *
5  * Permission to use, copy, modify, and distribute this software and
6  * its documentation for any purpose and without fee is hereby granted,
7  * provided that the above copyright notice appears in all copies and
8  * that both the copyright notice and this permission notice appear in
9  * supporting documentation.
10  *
11  * OSF DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE
12  * INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
13  * FOR A PARTICULAR PURPOSE.
14  *
15  * IN NO EVENT SHALL OSF BE LIABLE FOR ANY SPECIAL, INDIRECT, OR
16  * CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
17  * LOSS OF USE, DATA OR PROFITS, WHETHER IN ACTION OF CONTRACT,
18  * NEGLIGENCE, OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
19  * WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20  */
21 /*
22  * MkLinux
23  */
24 /* CMU_HIST */
25 /*
26  * Revision 2.17.3.7  92/09/15  17:20:54  jeffreyh
27  * 	Added code to detect if a race condition was lost in the NORMA
28  * 	system  and our port is now a proxy (migrated out from under the
29  * 	thread), in which case we return MACH_RCV_PORT_CHANGED.
30  * 	[92/07/24            sjs]
31  *
32  * Revision 2.17.3.6  92/06/24  17:59:57  jeffreyh
33  * 	Allow norma_ipc_send to fail.
34  * 	[92/06/02            dlb]
35  *
36  * Revision 2.17.3.5  92/05/27  00:44:23  jeffreyh
37  * 	In ipc_mqueue_receive, check whether a kmsg got queued while we called
38  * 	netipc_replenish.
39  * 	[92/05/12            dlb]
40  *
41  * Revision 2.17.3.4.2.1  92/04/28  16:01:59  jeffreyh
42  * 	Fixed race condition in NORMA system for ipc_mqueue_send().
43  * 	[92/04/16            sjs]
44  *
45  * Revision 2.17.3.4  92/03/28  10:09:17  jeffreyh
46  * 	In error cases call norma_ipc_error_receiving instead
47  * 	of norma_ipc_finish receiving.  This may eat the kmsg.
48  * 	[92/03/20		dlb]
49  *
50  * Revision 2.17.3.3  92/03/03  16:18:53  jeffreyh
51  * 	Changes from TRUNK
52  * 	[92/02/26  11:51:40  jeffreyh]
53  *
54  * Revision 2.18  92/01/03  20:13:05  dbg
55  * 	Removed THREAD_SHOULD_TERMINATE.
56  * 	[91/12/19            dbg]
57  *
58  * Revision 2.17.3.2  92/01/21  21:50:26  jsb
59  * 	Picked up hack from dlb@osf.org to call norma_ipc_finish_receiving
60  * 	before ipc_kmsg_destroy. The real fix is to use uncopyout_to_network.
61  * 	[92/01/17  14:35:03  jsb]
62  *
63  * Revision 2.17.3.1  92/01/03  16:35:24  jsb
64  * 	Removed spurious arguments to norma_ipc_send.
65  * 	Options and timeout will be handled here, not by norma_ipc_send.
66  * 	[91/12/26  19:51:59  jsb]
67  *
68  * 	Corrected log.
69  * 	[91/12/24  14:15:11  jsb]
70  *
71  * Revision 2.17  91/12/15  10:40:33  jsb
72  * 	Added norma_ipc_finish_receiving call to support large in-line msgs.
73  *
74  * Revision 2.16  91/12/14  14:27:10  jsb
75  * 	Removed ipc_fields.h hack.
76  *
77  * Revision 2.15  91/11/14  16:56:07  rpd
78  * 	Picked up mysterious norma changes.
79  * 	[91/11/14            rpd]
80  *
81  * Revision 2.14  91/08/28  11:13:34  jsb
82  * 	Added seqno argument to ipc_mqueue_receive.
83  * 	Also added seqno processing to ipc_mqueue_send, ipc_mqueue_move.
84  * 	[91/08/10            rpd]
85  * 	Fixed norma_ipc_handoff technology.
86  * 	Renamed clport things to norma_ipc things.
87  * 	[91/08/15  08:23:17  jsb]
88  *
89  * Revision 2.13  91/08/03  18:18:27  jsb
90  * 	Renamed replenish routine.
91  * 	[91/08/01  23:00:06  jsb]
92  *
93  * 	Removed obsolete include.
94  * 	Added option, timeout parameters to ipc_clport_send.
95  * 	[91/07/17  14:04:15  jsb]
96  *
97  * Revision 2.12  91/06/25  10:27:34  rpd
98  * 	Added some wait_result assertions.
99  * 	[91/05/30            rpd]
100  *
101  * Revision 2.11  91/06/17  15:46:18  jsb
102  * 	Renamed NORMA conditionals.
103  * 	[91/06/17  10:44:39  jsb]
104  *
105  * Revision 2.10  91/06/06  17:06:06  jsb
106  * 	Added call to ip_unlock after calling ipc_clport_send.
107  * 	Added support for clport handoff.
108  * 	[91/06/06  16:05:12  jsb]
109  *
110  * Revision 2.9  91/05/14  16:33:58  mrt
111  * 	Correcting copyright
112  *
113  * Revision 2.8  91/03/16  14:48:18  rpd
114  * 	Renamed ipc_thread_{go,will_wait,will_wait_with_timeout}
115  * 	to thread_{go,will_wait,will_wait_with_timeout}.
116  * 	Replaced ipc_thread_block with thread_block.
117  * 	[91/02/17            rpd]
118  *
119  * Revision 2.7  91/02/05  17:22:24  mrt
120  * 	Changed to new Mach copyright
121  * 	[91/02/01  15:46:33  mrt]
122  *
123  * Revision 2.6  91/01/08  15:14:35  rpd
124  * 	Changed continuation argument to (void (*)()).
125  * 	[90/12/18            rpd]
126  * 	Reorganized ipc_mqueue_receive.
127  * 	[90/11/22            rpd]
128  *
129  * 	Minor cleanup.
130  * 	[90/11/11            rpd]
131  *
132  * Revision 2.5  90/12/14  11:02:32  jsb
133  * 	Changed parameters in ipc_clport_send call.
134  * 	[90/12/13  21:20:13  jsb]
135  *
136  * Revision 2.4  90/11/05  14:29:04  rpd
137  * 	Use new io_reference and io_release.
138  * 	Use new ip_reference and ip_release.
139  * 	[90/10/29            rpd]
140  *
141  * Revision 2.3  90/09/28  16:54:58  jsb
142  * 	Added NORMA_IPC support.
143  * 	[90/09/28  14:03:24  jsb]
144  *
145  * Revision 2.2  90/06/02  14:50:39  rpd
146  * 	Created for new IPC.
147  * 	[90/03/26  20:57:06  rpd]
148  */
149 /* CMU_ENDHIST */
150 /*
151  * Mach Operating System
152  * Copyright (c) 1991,1990,1989 Carnegie Mellon University
153  * All Rights Reserved.
154  *
155  * Permission to use, copy, modify and distribute this software and its
156  * documentation is hereby granted, provided that both the copyright
157  * notice and this permission notice appear in all copies of the
158  * software, derivative works or modified versions, and any portions
159  * thereof, and that both notices appear in supporting documentation.
160  *
161  * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
162  * CONDITION.  CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
163  * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
164  *
165  * Carnegie Mellon requests users of this software to return to
166  *
167  *  Software Distribution Coordinator  or  Software.Distribution@CS.CMU.EDU
168  *  School of Computer Science
169  *  Carnegie Mellon University
170  *  Pittsburgh PA 15213-3890
171  *
172  * any improvements or extensions that they make and grant Carnegie Mellon
173  * the rights to redistribute these changes.
174  */
175 /*
176  */
177 /*
178  *	File:	ipc/ipc_mqueue.c
179  *	Author:	Rich Draves
180  *	Date:	1989
181  *
182  *	Functions to manipulate IPC message queues.
183  */
184 
185 
186 #include <sys/mach/port.h>
187 #include <sys/mach/message.h>
188 #include <sys/mach/ipc_kobject.h>
189 
190 #include <sys/mach/ipc/ipc_mqueue.h>
191 #include <sys/mach/ipc/ipc_thread.h>
192 #include <sys/mach/ipc/ipc_kmsg.h>
193 #include <sys/mach/ipc/ipc_port.h>
194 #include <sys/mach/ipc/ipc_pset.h>
195 #include <sys/mach/ipc/ipc_space.h>
196 #include <sys/mach/thread.h>
197 
198 
199 /*
200  *	Routine:	ipc_mqueue_init
201  *	Purpose:
202  *		Initialize a newly-allocated message queue.
203  */
204 
205 void
ipc_mqueue_init(ipc_mqueue_t mqueue)206 ipc_mqueue_init(
207 	ipc_mqueue_t	mqueue)
208 {
209 	ipc_kmsg_queue_init(&mqueue->imq_messages);
210 }
211 
212 /*
213  *	Routine:	ipc_mqueue_send
214  *	Purpose:
215  *		Send a message to a port.  The message holds a reference
216  *		for the destination port in the msgh_remote_port field.
217  *
218  *		If unsuccessful, the caller still has possession of
219  *		the message and must do something with it.  If successful,
220  *		the message is queued, given to a receiver, destroyed,
221  *		or handled directly by the kernel via mach_msg.
222  *	Conditions:
223  *		Nothing locked.
224  *	Returns:
225  *		MACH_MSG_SUCCESS	The message was accepted.
226  *		MACH_SEND_TIMED_OUT	Caller still has message.
227  *		MACH_SEND_INTERRUPTED	Caller still has message.
228  */
229 
230 mach_msg_return_t
ipc_mqueue_send(ipc_kmsg_t kmsg,mach_msg_option_t option,mach_msg_timeout_t timeout)231 ipc_mqueue_send(
232 	ipc_kmsg_t		kmsg,
233 	mach_msg_option_t	option,
234 	mach_msg_timeout_t	timeout)
235 {
236 	ipc_port_t port;
237 	kern_return_t           save_wait_result;
238 	ipc_thread_t self = current_thread();
239 
240 	port = (ipc_port_t) kmsg->ikm_header->msgh_remote_port;
241 	assert(IP_VALID(port));
242 	MACH_VERIFY(io_otype((ipc_object_t)port) == IOT_PORT, ("bad type %d\n", io_otype((ipc_object_t)port)));
243 	assert(io_otype((ipc_object_t)port) == IOT_PORT);
244 
245 	ip_lock(port);
246 
247 	if (port->ip_receiver == ipc_space_kernel) {
248 		ipc_kmsg_t reply;
249 
250 		/*
251 		 *	We can check ip_receiver == ipc_space_kernel
252 		 *	before checking that the port is active because
253 		 *	ipc_port_dealloc_kernel clears ip_receiver
254 		 *	before destroying a kernel port.
255 		 */
256 
257 		assert(ip_active(port));
258 		ip_unlock(port);
259 
260 		reply = ipc_kobject_server(kmsg);
261 		if (reply != IKM_NULL) {
262 			self->ith_kmsg =  reply;
263 			self->ith_object = (ipc_object_t)port;
264 			ip_lock(port);
265 			self->ith_seqno = port->ip_seqno++;
266 			ip_unlock(port);
267 			/* ipc_mqueue_send_always(reply); */
268 		}
269 #ifdef INVARIANTS
270 		else
271 			printf("reply from kobject == NULL\n");
272 #endif
273 		return MACH_MSG_SUCCESS;
274 	}
275 
276 	if (kmsg->ikm_header->msgh_bits & MACH_MSGH_BITS_CIRCULAR) {
277 		ip_unlock(port);
278 
279 		/* don't allow the creation of a circular loop */
280 
281 		ipc_kmsg_destroy(kmsg);
282 		return MACH_MSG_SUCCESS;
283 	}
284 
285 	for (;;) {
286 		ipc_thread_t self;
287 
288 		/*
289 		 *	Can't deliver to a dead port.
290 		 *	However, we can pretend it got sent
291 		 *	and was then immediately destroyed.
292 		 */
293 
294 		if (!ip_active(port)) {
295 			/*
296 			 *	We can't let ipc_kmsg_destroy deallocate
297 			 *	the port right, because we might end up
298 			 *	in an infinite loop trying to deliver
299 			 *	a send-once notification.
300 			 */
301 
302 			ip_unlock(port);
303 			ip_release(port);
304 			kmsg->ikm_header->msgh_remote_port = MACH_PORT_NULL;
305 			ipc_kmsg_destroy(kmsg);
306 			return MACH_MSG_SUCCESS;
307 		}
308 
309 		/*
310 		 *  Don't block if:
311 		 *	1) We're under the queue limit.
312 		 *	2) Caller used the MACH_SEND_ALWAYS internal option.
313 		 *	3) Message is sent to a send-once right.
314 		 */
315 
316 		if ((port->ip_msgcount < port->ip_qlimit) ||
317 		    (option & MACH_SEND_ALWAYS) ||
318 		    (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
319 						MACH_MSG_TYPE_PORT_SEND_ONCE))
320 			break;
321 
322 		/* must block waiting for queue to clear */
323 
324 		self = current_thread();
325 
326 		if (option & MACH_SEND_TIMEOUT) {
327 			if (timeout == 0) {
328 				ip_unlock(port);
329 				return MACH_SEND_TIMED_OUT;
330 			}
331 			thread_will_wait_with_timeout(self, timeout);
332 		} else {
333 			thread_will_wait(self);
334 		}
335 
336 		counter(c_ipc_mqueue_send_block++);
337 		self->ith_state = MACH_SEND_IN_PROGRESS;
338 		self->ith_block_lock_data = &port->port_comm.rcd_io_lock_data;
339 		ipc_thread_enqueue(&port->ip_blocked, self);
340 		thread_block();
341 
342 		/* Save proper wait_result in case we block */
343 		save_wait_result = self->wait_result;
344 
345 		/* why did we wake up? - finish_receive will remove us from the queue */
346 
347 		if (self->ith_state == MACH_MSG_SUCCESS)
348 			continue;
349 		ipc_thread_rmqueue(&port->ip_blocked, self);
350 		assert(self->ith_state == MACH_SEND_IN_PROGRESS);
351 
352 		/*
353 		 *	Thread wakeup-reason field tells us why
354 		 *	the wait was interrupted.
355 		 */
356 
357 		switch (save_wait_result) {
358 		    case THREAD_INTERRUPTED:
359 			/* send was interrupted - give up */
360 
361 			ip_unlock(port);
362 			return MACH_SEND_INTERRUPTED;
363 
364 		    case THREAD_TIMED_OUT:
365 			/* timeout expired */
366 
367 			assert(option & MACH_SEND_TIMEOUT);
368 			timeout = 0;
369 			break;
370 
371 		    case THREAD_RESTART:
372 		    default:
373 				panic("ipc_mqueue_send %d", save_wait_result);
374 		}
375 	}
376 	(void) ipc_mqueue_deliver(port, kmsg, TRUE);
377 
378 	return MACH_MSG_SUCCESS;
379 }
380 
381 
382 /*
383  * ipc_mqueue_deliver: give the kmsg to a waiting receiver or else enqueue
384  * it on the port or pset.
385  *
386  * ipc_mqueue_deliver used to be the last block of code in ipc_mqueue_send.
387  * It is split out here so that the same code can be used to deliver DIPC
388  * kmsgs or meta_kmsgs.
389  *
390  * The port must be locked on entry; it will be unlocked on exit.  This
391  * routine CAN NOT FAIL when called from thread context.  However, from
392  * interrupt context it may not be able to complete its duties due to
393  * lock contention.
394  *
395  * Returns TRUE on successful delivery.
396  */
397 #if	TRACE_BUFFER
398 int	tr_ipc_mqueue_deliver = 0;
399 #define	TR_IPC_MQEN(fmt, port, kmsg, thread_context)			\
400 	tr_start();							\
401 	if (tr_ipc_mqueue_deliver)					\
402 		tr4((fmt), (port), (kmsg), (thread_context));
403 #define	TR_IPC_MQEX(fmt, kmsg)						\
404 	if (tr_ipc_mqueue_deliver)					\
405 		tr2((fmt), (kmsg));					\
406 	tr_stop();
407 #else	/* TRACE_BUFFER */
408 #define	TR_IPC_MQEN(fmt, port, kmsg, thread_context)
409 #define	TR_IPC_MQEX(fmt, kmsg)
410 #endif	/* TRACE_BUFFER */
411 
412 
413 static void
ipc_mqueue_run(thread_act_t receiver,ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,ipc_port_t port)414 ipc_mqueue_run(thread_act_t receiver, ipc_mqueue_t mqueue, ipc_kmsg_t kmsg, ipc_port_t port)
415 {
416 	MPASS(receiver->ith_state == MACH_RCV_IN_PROGRESS ||
417 		  receiver->ith_state == MACH_RCV_IN_PROGRESS_TIMED);
418 	receiver->ith_state = MACH_MSG_SUCCESS;
419 	receiver->ith_kmsg = kmsg;
420 	receiver->ith_object = (ipc_object_t)port;
421 	receiver->ith_seqno = port->ip_seqno++;
422 	ip_unlock(port);
423 	thread_go(receiver);
424 }
425 
426 mach_msg_return_t
ipc_mqueue_deliver(register ipc_port_t port,register ipc_kmsg_t kmsg,boolean_t thread_context)427 ipc_mqueue_deliver(
428 	register ipc_port_t	port,
429 	register ipc_kmsg_t	kmsg,
430 	boolean_t		thread_context)
431 {
432 	ipc_mqueue_t mqueue;
433 	ipc_pset_t pset;
434 	ipc_thread_t receiver;
435 	TR_DECL("ipc_mqueue_deliver");
436 
437 	TR_IPC_MQEN("enter: port 0x%x kmsg 0x%x thd_ctxt %d", port, kmsg,
438 		    thread_context);
439 
440 	assert(IP_VALID(port));
441 	assert(ip_active(port));
442 
443 	pset = port->ip_pset;
444 	mqueue = &port->ip_messages;
445 	receiver = NULL;
446 
447     /* first we check the the port and portset for waiters */
448 	if (pset != NULL) {
449 		ips_lock(pset);
450 		receiver = thread_pool_get_act((ipc_object_t)pset, 0);
451 		ips_unlock(pset);
452 	} else if (receiver == NULL) {
453 		receiver = thread_pool_get_act((ipc_object_t)port, 0);
454 	}
455 	/* we have a receiver - we're done */
456 	if (receiver != NULL) {
457 		ipc_mqueue_run(receiver, mqueue, kmsg, port);
458 		return (MACH_MSG_SUCCESS);
459 	}
460 
461 	assert(port->ip_msgcount >= 0);
462 	ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
463 	port->ip_msgcount++;
464 	ip_unlock(port);
465 
466 	if (pset)
467 		ipc_pset_signal(pset);
468 
469 	TR_IPC_MQEX("exit: wakeup 0x%x", receiver);
470 	return MACH_MSG_SUCCESS;
471 }
472 
473 
474 /*
475  *	Routine:	ipc_mqueue_copyin
476  *	Purpose:
477  *		Convert a name in a space to a message queue.
478  *	Conditions:
479  *		Nothing locked.  If successful, the message queue
480  *		is returned locked and caller gets a ref for the object.
481  *		This ref ensures the continued existence of the queue.
482  *	Returns:
483  *		MACH_MSG_SUCCESS	Found a message queue.
484  *		MACH_RCV_INVALID_NAME	The space is dead.
485  *		MACH_RCV_INVALID_NAME	The name doesn't denote a right.
486  *		MACH_RCV_INVALID_NAME
487  *			The denoted right is not receive or port set.
488  *		MACH_RCV_IN_SET		Receive right is a member of a set.
489  */
490 
491 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_entry_bits_t * bitsp,ipc_object_t * objectp)492 ipc_mqueue_copyin(
493 	ipc_space_t	space,
494 	mach_port_name_t	name,
495 	ipc_entry_bits_t *bitsp,
496 	ipc_object_t	*objectp)
497 {
498 	ipc_entry_t entry;
499 	ipc_entry_bits_t bits;
500 	ipc_object_t object;
501 	mach_msg_return_t mr;
502 
503 	is_read_lock(space);
504 	if (!space->is_active) {
505 		mr = MACH_RCV_INVALID_NAME;
506 		goto error;
507 	}
508 
509 	entry = ipc_entry_lookup(space, name);
510 	if (entry == IE_NULL) {
511 		mr = MACH_RCV_INVALID_NAME;
512 		goto error;
513 	}
514 
515 	bits = entry->ie_bits;
516 	object = entry->ie_object;
517 	ipc_object_reference(object);
518 
519 	if (bits & MACH_PORT_TYPE_RECEIVE) {
520 		ipc_port_t port = NULL;
521 
522 		port = (ipc_port_t) object;
523 		assert(port != IP_NULL);
524 
525 		assert(ip_active(port));
526 		assert(port->ip_receiver_name == name);
527 		assert(port->ip_receiver == space);
528 		is_read_unlock(space);
529 	} else if (bits & MACH_PORT_TYPE_PORT_SET) {
530 		ipc_pset_t pset;
531 
532 		pset = (ipc_pset_t) object;
533 		assert(pset != IPS_NULL);
534 
535 		assert(ips_active(pset));
536 		assert(pset->ips_local_name == name);
537 		is_read_unlock(space);
538 	} else {
539 		ipc_object_release(object);
540 		mr = MACH_RCV_INVALID_NAME;
541 		goto error;
542 	}
543 
544 	/*
545 	 *	At this point, the object is locked and active,
546 	 *	the space is unlocked, and mqueue is initialized.
547 	 */
548 
549 	*objectp = object;
550 	*bitsp = bits;
551 	return MACH_MSG_SUCCESS;
552 error:
553 	is_read_unlock(space);
554 	return (mr);
555 }
556 
557 
558 static int
ipc_mqueue_receive_error(ipc_thread_t self,int save_wait_result,int option)559 ipc_mqueue_receive_error(ipc_thread_t self, int save_wait_result, int option)
560 {
561 	switch (self->ith_state) {
562 	case MACH_RCV_PORT_DIED:
563 	case MACH_RCV_PORT_CHANGED:
564 		/* something bad happened to the port/set */
565 		return self->ith_state;
566 	case MACH_RCV_IN_PROGRESS:
567 	case MACH_RCV_IN_PROGRESS_TIMED:
568 		/*
569 		 *	Awakened for other than IPC completion.
570 		 *	Remove ourself from the waiting queue,
571 		 *	then check the wakeup cause.
572 		 */
573 		if (self->ith_active) {
574 			thread_pool_remove(self);
575 			self->ith_block_lock_data = NULL;
576 			self->ith_active = 0;
577 		}
578 		switch (save_wait_result) {
579 		case THREAD_INTERRUPTED:
580 			/* receive was interrupted - give up */
581 			return MACH_RCV_INTERRUPTED;
582 		case THREAD_TIMED_OUT:
583 			/* timeout expired */
584 			assert(option & MACH_RCV_TIMEOUT);
585 			assert(self->ith_state == MACH_RCV_IN_PROGRESS_TIMED);
586 			return (MACH_RCV_TIMED_OUT);
587 		case THREAD_RESTART:
588 		default:
589 			panic("ipc_mqueue_receive: bad wait_result");
590 		}
591 		break;
592 
593 	default:
594 		panic("ipc_mqueue_receive: strange ith_state");
595 	}
596 }
597 
598 static void
ipc_mqueue_post_on_thread(ipc_port_t port,mach_msg_option_t option,mach_msg_size_t max_size,thread_t thread)599 ipc_mqueue_post_on_thread(
600 	ipc_port_t			port,
601 	mach_msg_option_t	option,
602 	mach_msg_size_t		max_size,
603 	thread_t                thread)
604 {
605 	ipc_kmsg_t kmsg;
606 	ipc_mqueue_t 	mqueue = &port->ip_messages;
607 
608 	mach_msg_return_t mr = MACH_MSG_SUCCESS;
609 	mach_msg_size_t rcv_size;
610 	vm_map_t map = current_map();
611 
612 	/*
613 	 * Do some sanity checking of our ability to receive
614 	 * before pulling the message off the queue.
615 	 */
616 	kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
617 	assert(kmsg != IKM_NULL);
618 
619 	/*
620 	 * If we really can't receive it, but we had the
621 	 * MACH_RCV_LARGE option set, then don't take it off
622 	 * the queue, instead return the appropriate error
623 	 * (and size needed).
624 	 */
625 	rcv_size = ipc_kmsg_copyout_size(kmsg, map);
626 	if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
627 		mr = MACH_RCV_TOO_LARGE;
628 		if (option & MACH_RCV_LARGE) {
629 			thread->ith_receiver_name = port->ip_receiver_name;
630 			thread->ith_kmsg = IKM_NULL;
631 			thread->ith_msize = rcv_size;
632 			thread->ith_seqno = 0;
633 			thread->ith_state = mr;
634 			return;
635 		}
636 	}
637 
638 	ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
639 	assert(port->ip_msgcount > 0);
640 	port->ip_msgcount--;
641 
642 	thread->ith_object = (ipc_object_t)port;
643 	thread->ith_seqno = port->ip_seqno++;
644 	thread->ith_kmsg = kmsg;
645 	thread->ith_state = mr;
646 
647 	current_task()->messages_received++;
648 }
649 
650 mach_msg_return_t
ipc_mqueue_pset_receive(natural_t bits,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t timeout,thread_t thread)651 ipc_mqueue_pset_receive(
652 	natural_t	bits,
653 	mach_msg_option_t	option,
654 	mach_msg_size_t		max_size,
655 	mach_msg_timeout_t	timeout,
656 	thread_t thread)
657 {
658 	ipc_port_t port;
659 	ipc_pset_t pset;
660 
661 	pset = (ipc_pset_t)thread->ith_object;
662 	assert(io_otype(thread->ith_object) == IOT_PORT_SET);
663 restart:
664 	TAILQ_FOREACH(port, &pset->ips_ports, ip_next) {
665 		mtx_assert(&port->port_comm.rcd_io_lock_data, MA_NOTOWNED);
666 		assert (port->ip_msgcount >= 0);
667 		if (port->ip_msgcount != 0) {
668 			if (ip_lock_try(port) == 0) {
669 				ips_unlock(pset);
670 				ip_lock(port);
671 				ips_lock(pset);
672 			}
673 			/* one way or another we have the lock */
674 			break;
675 		}
676 	}
677 	if (port != NULL && port->ip_msgcount == 0) {
678 		mtx_assert(&port->port_comm.rcd_io_lock_data, MA_OWNED);
679 		ip_unlock(port);
680 		goto restart;
681 	}
682 	if (port != NULL) {
683 		mtx_assert(&port->port_comm.rcd_io_lock_data, MA_OWNED);
684 		ipc_mqueue_post_on_thread(port, option, max_size, thread);
685 		ip_unlock(port);
686 		thread->ith_object = (ipc_object_t)port;
687 		return (THREAD_NOT_WAITING);
688 	}
689 	if ((option & MACH_RCV_TIMEOUT) && (timeout == 0)) {
690 		thread->ith_state = MACH_RCV_TIMED_OUT;
691 		return (THREAD_NOT_WAITING);
692 	}
693 
694 	return (THREAD_WAITING);
695 }
696 
697 /*
698  *	Routine:	ipc_mqueue_receive
699  *	Purpose:
700  *		Receive a message from a message queue.
701  *
702  *	Conditions:
703  *		The message queue is locked; it will be returned unlocked.
704  *
705  *		Our caller must hold a reference for the port or port set
706  *		to which this queue belongs, to keep the queue
707  *		from being deallocated.  Furthermore, the port or set
708  *		must have been active when the queue was locked.
709  *
710  *		The kmsg is returned with clean header fields
711  *		and with the circular bit turned off.
712  *	Returns:
713  *		MACH_MSG_SUCCESS	Message returned in kmsgp.
714  *		MACH_RCV_TOO_LARGE	Message size returned in kmsgp.
715  *		MACH_RCV_TIMED_OUT	No message obtained.
716  *		MACH_RCV_INTERRUPTED	No message obtained.
717  *		MACH_RCV_PORT_DIED	Port/set died; no message.
718  *		MACH_RCV_PORT_CHANGED	Port moved into set; no msg.
719  *
720  */
721 
722 
723 mach_msg_return_t
ipc_mqueue_receive(natural_t bits,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t timeout,ipc_kmsg_t * kmsgp,mach_port_seqno_t * seqnop,thread_t thread)724 ipc_mqueue_receive(
725 	natural_t	bits,
726 	mach_msg_option_t	option,
727 	mach_msg_size_t		max_size,
728 	mach_msg_timeout_t	timeout,
729 	ipc_kmsg_t		*kmsgp,
730 	mach_port_seqno_t	*seqnop,
731 	thread_t thread)
732 {
733 	ipc_port_t port;
734 	ipc_pset_t pset;
735 	ipc_kmsg_t kmsg;
736 	ipc_mqueue_t mqueue;
737 	mach_port_seqno_t seqno;
738 	mach_msg_return_t mr;
739 	ipc_kmsg_queue_t kmsgs;
740 	thread_t self;
741 	kern_return_t	save_wait_result;
742 	int rc;
743 
744 	/* logic currently too confused to support anything else */
745 	MPASS(thread == current_thread());
746 	MPASS(thread->ith_object != NULL);
747 	assert(io_otype(thread->ith_object) == IOT_PORT || io_otype(thread->ith_object) == IOT_PORT_SET);
748 	self = thread;
749 	pset = NULL;
750 
751 	io_lock(thread->ith_object);
752 	io_reference(thread->ith_object);
753 	if (thread->ith_kmsg != NULL) {
754 		thread->ith_state = MACH_MSG_SUCCESS;
755 		goto rx_done;
756 	}
757 
758 	if (bits & MACH_PORT_TYPE_PORT_SET) {
759 		pset = (ipc_pset_t)thread->ith_object;
760 
761 		rc = ipc_mqueue_pset_receive(bits, option, max_size, timeout, thread);
762 		if (rc == THREAD_NOT_WAITING) {
763 			if (thread->ith_state == MACH_RCV_TIMED_OUT || thread->ith_state == MACH_RCV_TOO_LARGE) {
764 				ips_unlock(pset);
765 				ips_release(pset);
766 				return (thread->ith_state);
767 			} else {
768 				kmsg = thread->ith_kmsg;
769 				seqno = thread->ith_seqno;
770 				MPASS(pset != (ipc_pset_t)thread->ith_object);
771 				/* drop passed in pset lock and acquire the port lock */
772 				ips_unlock(pset);
773 				ips_release(pset);
774 				pset = NULL;
775 
776 				io_lock(thread->ith_object);
777 				io_reference(thread->ith_object);
778 				goto rx_done;
779 			}
780 		}
781 		assert(io_otype(thread->ith_object) == IOT_PORT_SET);
782 	} else {
783 		port = (ipc_port_t)thread->ith_object;
784 		assert(port->ip_msgcount >= 0);
785 		mqueue = &port->ip_messages;
786 		kmsgs = &mqueue->imq_messages;
787 		kmsg = ipc_kmsg_queue_first(kmsgs);
788 		/* a message is already on the queue */
789 		if (kmsg != IKM_NULL) {
790 			ipc_mqueue_post_on_thread(port, option, max_size, thread);
791 			if (thread->ith_state == MACH_MSG_SUCCESS)
792 				goto rx_done;
793 			else {
794 				io_unlock(thread->ith_object);
795 				io_release(thread->ith_object);
796 				return (thread->ith_state);
797 			}
798 		}
799 	}
800 
801 	/* must block waiting for a message */
802 	if (option & MACH_RCV_TIMEOUT) {
803 		if (timeout == 0) {
804 			return MACH_RCV_TIMED_OUT;
805 		}
806 
807 		self->ith_state = MACH_RCV_IN_PROGRESS_TIMED;
808 	} else {
809 		self->ith_state = MACH_RCV_IN_PROGRESS;
810 		timeout = 0;
811 	}
812 	thread_will_wait_with_timeout(self, timeout);
813 
814 	self->ith_active = 1;
815 	self->ith_block_lock_data = &((rpc_common_t)(self->ith_object))->rcd_io_lock_data;
816 	thread_pool_put_act(self);
817 
818 	self->ith_msize = max_size;
819 	thread_block();
820 	/* Save proper wait_result in case we block */
821 	save_wait_result = self->wait_result;
822 
823 	/* why did we wake up? */
824 	if (self->ith_state != MACH_MSG_SUCCESS)
825 		goto error;
826 
827 	if (pset) {
828 		ips_unlock(pset);
829 		ips_release(pset);
830 		io_reference(self->ith_object);
831 		io_lock(self->ith_object);
832 	}
833 
834 rx_done:
835 	assert(io_otype(self->ith_object) == IOT_PORT);
836 	assert(self->ith_kmsg != NULL);
837 	*kmsgp = self->ith_kmsg;
838 	*seqnop = self->ith_seqno;
839 	port = (ipc_port_t)thread->ith_object;
840 
841 	assert(io_otype(self->ith_object) == IOT_PORT);
842 
843 	mr = ipc_mqueue_finish_receive(kmsgp, port, option, max_size);
844 
845 	io_unlock(self->ith_object);
846 	io_release(self->ith_object);
847 	self->ith_kmsg = NULL;
848 	self->ith_object = NULL;
849 	return (mr);
850 error:
851 	mr = ipc_mqueue_receive_error(self, save_wait_result, option);
852 
853 	io_unlock(self->ith_object);
854 	io_release(self->ith_object);
855 	self->ith_kmsg = NULL;
856 	self->ith_object = NULL;
857 	return (mr);
858 }
859 
860 
861 mach_msg_return_t
ipc_mqueue_finish_receive(ipc_kmsg_t * kmsgp,ipc_port_t port,mach_msg_option_t option,mach_msg_size_t max_size)862 ipc_mqueue_finish_receive(
863 	ipc_kmsg_t		*kmsgp,
864 	ipc_port_t		port,
865 	mach_msg_option_t	option,
866 	mach_msg_size_t		max_size)
867 {
868 	ipc_kmsg_t		kmsg;
869 	mach_msg_return_t	mr;
870 	ipc_thread_t 		self = current_thread();
871 	mach_msg_size_t rcv_size;
872 
873 	mr = MACH_MSG_SUCCESS;
874 	kmsg = *kmsgp;
875 	/* check sizes */
876 
877 	rcv_size = ipc_kmsg_copyout_size(kmsg, thread_map(self));
878 	if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
879 		/* the receive buffer isn't large enough */
880 		if (mach_debug_enable) {
881 			printf("%s max_size=%d REQUESTED_TRAILER_SIZE(option=%d)=%d\n",
882 				   curthread->td_proc->p_comm, max_size, option, REQUESTED_TRAILER_SIZE(option));
883 			/* ipc_kmsg_print(kmsg); */
884 			printf("rcv_size=%d\n", rcv_size);
885 		}
886 		mr = MACH_RCV_TOO_LARGE;
887 	} else if (self->ith_scatter_list != MACH_MSG_BODY_NULL) {
888 		/* verify the scatter list */
889 		mr = ipc_kmsg_check_scatter(kmsg,
890 					self->ith_option,
891 					&self->ith_scatter_list,
892 					&self->ith_scatter_list_size);
893 	}
894 
895 	if (mr == MACH_MSG_SUCCESS) {
896 		assert((kmsg->ikm_header->msgh_bits & MACH_MSGH_BITS_CIRCULAR)
897 									== 0);
898 	}
899 
900 	if (ip_active(port)) {
901 		ipc_thread_queue_t senders;
902 		ipc_thread_t sender;
903 
904 		assert(port->ip_msgcount >= 0);
905 		senders = &port->ip_blocked;
906 		sender = ipc_thread_queue_first(senders);
907 
908 		MPASS(sender == NULL || sender->ith_state == MACH_SEND_IN_PROGRESS);
909 
910 		if ((sender != ITH_NULL) &&
911 			(port->ip_msgcount < port->ip_qlimit)) {
912 			ipc_thread_rmqueue(senders, sender);
913 			sender->ith_state = MACH_MSG_SUCCESS;
914 			thread_go(sender);
915 		}
916 	}
917 	*kmsgp = kmsg;
918 	return mr;
919 }
920