1 /*
2 * Copyright 1991-1998 by Open Software Foundation, Inc.
3 * All Rights Reserved
4 *
5 * Permission to use, copy, modify, and distribute this software and
6 * its documentation for any purpose and without fee is hereby granted,
7 * provided that the above copyright notice appears in all copies and
8 * that both the copyright notice and this permission notice appear in
9 * supporting documentation.
10 *
11 * OSF DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE
12 * INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
13 * FOR A PARTICULAR PURPOSE.
14 *
15 * IN NO EVENT SHALL OSF BE LIABLE FOR ANY SPECIAL, INDIRECT, OR
16 * CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
17 * LOSS OF USE, DATA OR PROFITS, WHETHER IN ACTION OF CONTRACT,
18 * NEGLIGENCE, OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
19 * WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 */
21 /*
22 * MkLinux
23 */
24 /* CMU_HIST */
25 /*
26 * Revision 2.17.3.7 92/09/15 17:20:54 jeffreyh
27 * Added code to detect if a race condition was lost in the NORMA
28 * system and our port is now a proxy (migrated out from under the
29 * thread), in which case we return MACH_RCV_PORT_CHANGED.
30 * [92/07/24 sjs]
31 *
32 * Revision 2.17.3.6 92/06/24 17:59:57 jeffreyh
33 * Allow norma_ipc_send to fail.
34 * [92/06/02 dlb]
35 *
36 * Revision 2.17.3.5 92/05/27 00:44:23 jeffreyh
37 * In ipc_mqueue_receive, check whether a kmsg got queued while we called
38 * netipc_replenish.
39 * [92/05/12 dlb]
40 *
41 * Revision 2.17.3.4.2.1 92/04/28 16:01:59 jeffreyh
42 * Fixed race condition in NORMA system for ipc_mqueue_send().
43 * [92/04/16 sjs]
44 *
45 * Revision 2.17.3.4 92/03/28 10:09:17 jeffreyh
46 * In error cases call norma_ipc_error_receiving instead
47 * of norma_ipc_finish receiving. This may eat the kmsg.
48 * [92/03/20 dlb]
49 *
50 * Revision 2.17.3.3 92/03/03 16:18:53 jeffreyh
51 * Changes from TRUNK
52 * [92/02/26 11:51:40 jeffreyh]
53 *
54 * Revision 2.18 92/01/03 20:13:05 dbg
55 * Removed THREAD_SHOULD_TERMINATE.
56 * [91/12/19 dbg]
57 *
58 * Revision 2.17.3.2 92/01/21 21:50:26 jsb
59 * Picked up hack from dlb@osf.org to call norma_ipc_finish_receiving
60 * before ipc_kmsg_destroy. The real fix is to use uncopyout_to_network.
61 * [92/01/17 14:35:03 jsb]
62 *
63 * Revision 2.17.3.1 92/01/03 16:35:24 jsb
64 * Removed spurious arguments to norma_ipc_send.
65 * Options and timeout will be handled here, not by norma_ipc_send.
66 * [91/12/26 19:51:59 jsb]
67 *
68 * Corrected log.
69 * [91/12/24 14:15:11 jsb]
70 *
71 * Revision 2.17 91/12/15 10:40:33 jsb
72 * Added norma_ipc_finish_receiving call to support large in-line msgs.
73 *
74 * Revision 2.16 91/12/14 14:27:10 jsb
75 * Removed ipc_fields.h hack.
76 *
77 * Revision 2.15 91/11/14 16:56:07 rpd
78 * Picked up mysterious norma changes.
79 * [91/11/14 rpd]
80 *
81 * Revision 2.14 91/08/28 11:13:34 jsb
82 * Added seqno argument to ipc_mqueue_receive.
83 * Also added seqno processing to ipc_mqueue_send, ipc_mqueue_move.
84 * [91/08/10 rpd]
85 * Fixed norma_ipc_handoff technology.
86 * Renamed clport things to norma_ipc things.
87 * [91/08/15 08:23:17 jsb]
88 *
89 * Revision 2.13 91/08/03 18:18:27 jsb
90 * Renamed replenish routine.
91 * [91/08/01 23:00:06 jsb]
92 *
93 * Removed obsolete include.
94 * Added option, timeout parameters to ipc_clport_send.
95 * [91/07/17 14:04:15 jsb]
96 *
97 * Revision 2.12 91/06/25 10:27:34 rpd
98 * Added some wait_result assertions.
99 * [91/05/30 rpd]
100 *
101 * Revision 2.11 91/06/17 15:46:18 jsb
102 * Renamed NORMA conditionals.
103 * [91/06/17 10:44:39 jsb]
104 *
105 * Revision 2.10 91/06/06 17:06:06 jsb
106 * Added call to ip_unlock after calling ipc_clport_send.
107 * Added support for clport handoff.
108 * [91/06/06 16:05:12 jsb]
109 *
110 * Revision 2.9 91/05/14 16:33:58 mrt
111 * Correcting copyright
112 *
113 * Revision 2.8 91/03/16 14:48:18 rpd
114 * Renamed ipc_thread_{go,will_wait,will_wait_with_timeout}
115 * to thread_{go,will_wait,will_wait_with_timeout}.
116 * Replaced ipc_thread_block with thread_block.
117 * [91/02/17 rpd]
118 *
119 * Revision 2.7 91/02/05 17:22:24 mrt
120 * Changed to new Mach copyright
121 * [91/02/01 15:46:33 mrt]
122 *
123 * Revision 2.6 91/01/08 15:14:35 rpd
124 * Changed continuation argument to (void (*)()).
125 * [90/12/18 rpd]
126 * Reorganized ipc_mqueue_receive.
127 * [90/11/22 rpd]
128 *
129 * Minor cleanup.
130 * [90/11/11 rpd]
131 *
132 * Revision 2.5 90/12/14 11:02:32 jsb
133 * Changed parameters in ipc_clport_send call.
134 * [90/12/13 21:20:13 jsb]
135 *
136 * Revision 2.4 90/11/05 14:29:04 rpd
137 * Use new io_reference and io_release.
138 * Use new ip_reference and ip_release.
139 * [90/10/29 rpd]
140 *
141 * Revision 2.3 90/09/28 16:54:58 jsb
142 * Added NORMA_IPC support.
143 * [90/09/28 14:03:24 jsb]
144 *
145 * Revision 2.2 90/06/02 14:50:39 rpd
146 * Created for new IPC.
147 * [90/03/26 20:57:06 rpd]
148 */
149 /* CMU_ENDHIST */
150 /*
151 * Mach Operating System
152 * Copyright (c) 1991,1990,1989 Carnegie Mellon University
153 * All Rights Reserved.
154 *
155 * Permission to use, copy, modify and distribute this software and its
156 * documentation is hereby granted, provided that both the copyright
157 * notice and this permission notice appear in all copies of the
158 * software, derivative works or modified versions, and any portions
159 * thereof, and that both notices appear in supporting documentation.
160 *
161 * CARNEGIE MELLON ALLOWS FREE USE OF THIS SOFTWARE IN ITS "AS IS"
162 * CONDITION. CARNEGIE MELLON DISCLAIMS ANY LIABILITY OF ANY KIND FOR
163 * ANY DAMAGES WHATSOEVER RESULTING FROM THE USE OF THIS SOFTWARE.
164 *
165 * Carnegie Mellon requests users of this software to return to
166 *
167 * Software Distribution Coordinator or Software.Distribution@CS.CMU.EDU
168 * School of Computer Science
169 * Carnegie Mellon University
170 * Pittsburgh PA 15213-3890
171 *
172 * any improvements or extensions that they make and grant Carnegie Mellon
173 * the rights to redistribute these changes.
174 */
175 /*
176 */
177 /*
178 * File: ipc/ipc_mqueue.c
179 * Author: Rich Draves
180 * Date: 1989
181 *
182 * Functions to manipulate IPC message queues.
183 */
184
185
186 #include <sys/mach/port.h>
187 #include <sys/mach/message.h>
188 #include <sys/mach/ipc_kobject.h>
189
190 #include <sys/mach/ipc/ipc_mqueue.h>
191 #include <sys/mach/ipc/ipc_thread.h>
192 #include <sys/mach/ipc/ipc_kmsg.h>
193 #include <sys/mach/ipc/ipc_port.h>
194 #include <sys/mach/ipc/ipc_pset.h>
195 #include <sys/mach/ipc/ipc_space.h>
196 #include <sys/mach/thread.h>
197
198
199 /*
200 * Routine: ipc_mqueue_init
201 * Purpose:
202 * Initialize a newly-allocated message queue.
203 */
204
205 void
ipc_mqueue_init(ipc_mqueue_t mqueue)206 ipc_mqueue_init(
207 ipc_mqueue_t mqueue)
208 {
209 ipc_kmsg_queue_init(&mqueue->imq_messages);
210 }
211
212 /*
213 * Routine: ipc_mqueue_send
214 * Purpose:
215 * Send a message to a port. The message holds a reference
216 * for the destination port in the msgh_remote_port field.
217 *
218 * If unsuccessful, the caller still has possession of
219 * the message and must do something with it. If successful,
220 * the message is queued, given to a receiver, destroyed,
221 * or handled directly by the kernel via mach_msg.
222 * Conditions:
223 * Nothing locked.
224 * Returns:
225 * MACH_MSG_SUCCESS The message was accepted.
226 * MACH_SEND_TIMED_OUT Caller still has message.
227 * MACH_SEND_INTERRUPTED Caller still has message.
228 */
229
230 mach_msg_return_t
ipc_mqueue_send(ipc_kmsg_t kmsg,mach_msg_option_t option,mach_msg_timeout_t timeout)231 ipc_mqueue_send(
232 ipc_kmsg_t kmsg,
233 mach_msg_option_t option,
234 mach_msg_timeout_t timeout)
235 {
236 ipc_port_t port;
237 kern_return_t save_wait_result;
238 ipc_thread_t self = current_thread();
239
240 port = (ipc_port_t) kmsg->ikm_header->msgh_remote_port;
241 assert(IP_VALID(port));
242 MACH_VERIFY(io_otype((ipc_object_t)port) == IOT_PORT, ("bad type %d\n", io_otype((ipc_object_t)port)));
243 assert(io_otype((ipc_object_t)port) == IOT_PORT);
244
245 ip_lock(port);
246
247 if (port->ip_receiver == ipc_space_kernel) {
248 ipc_kmsg_t reply;
249
250 /*
251 * We can check ip_receiver == ipc_space_kernel
252 * before checking that the port is active because
253 * ipc_port_dealloc_kernel clears ip_receiver
254 * before destroying a kernel port.
255 */
256
257 assert(ip_active(port));
258 ip_unlock(port);
259
260 reply = ipc_kobject_server(kmsg);
261 if (reply != IKM_NULL) {
262 self->ith_kmsg = reply;
263 self->ith_object = (ipc_object_t)port;
264 ip_lock(port);
265 self->ith_seqno = port->ip_seqno++;
266 ip_unlock(port);
267 /* ipc_mqueue_send_always(reply); */
268 }
269 #ifdef INVARIANTS
270 else
271 printf("reply from kobject == NULL\n");
272 #endif
273 return MACH_MSG_SUCCESS;
274 }
275
276 if (kmsg->ikm_header->msgh_bits & MACH_MSGH_BITS_CIRCULAR) {
277 ip_unlock(port);
278
279 /* don't allow the creation of a circular loop */
280
281 ipc_kmsg_destroy(kmsg);
282 return MACH_MSG_SUCCESS;
283 }
284
285 for (;;) {
286 ipc_thread_t self;
287
288 /*
289 * Can't deliver to a dead port.
290 * However, we can pretend it got sent
291 * and was then immediately destroyed.
292 */
293
294 if (!ip_active(port)) {
295 /*
296 * We can't let ipc_kmsg_destroy deallocate
297 * the port right, because we might end up
298 * in an infinite loop trying to deliver
299 * a send-once notification.
300 */
301
302 ip_unlock(port);
303 ip_release(port);
304 kmsg->ikm_header->msgh_remote_port = MACH_PORT_NULL;
305 ipc_kmsg_destroy(kmsg);
306 return MACH_MSG_SUCCESS;
307 }
308
309 /*
310 * Don't block if:
311 * 1) We're under the queue limit.
312 * 2) Caller used the MACH_SEND_ALWAYS internal option.
313 * 3) Message is sent to a send-once right.
314 */
315
316 if ((port->ip_msgcount < port->ip_qlimit) ||
317 (option & MACH_SEND_ALWAYS) ||
318 (MACH_MSGH_BITS_REMOTE(kmsg->ikm_header->msgh_bits) ==
319 MACH_MSG_TYPE_PORT_SEND_ONCE))
320 break;
321
322 /* must block waiting for queue to clear */
323
324 self = current_thread();
325
326 if (option & MACH_SEND_TIMEOUT) {
327 if (timeout == 0) {
328 ip_unlock(port);
329 return MACH_SEND_TIMED_OUT;
330 }
331 thread_will_wait_with_timeout(self, timeout);
332 } else {
333 thread_will_wait(self);
334 }
335
336 counter(c_ipc_mqueue_send_block++);
337 self->ith_state = MACH_SEND_IN_PROGRESS;
338 self->ith_block_lock_data = &port->port_comm.rcd_io_lock_data;
339 ipc_thread_enqueue(&port->ip_blocked, self);
340 thread_block();
341
342 /* Save proper wait_result in case we block */
343 save_wait_result = self->wait_result;
344
345 /* why did we wake up? - finish_receive will remove us from the queue */
346
347 if (self->ith_state == MACH_MSG_SUCCESS)
348 continue;
349 ipc_thread_rmqueue(&port->ip_blocked, self);
350 assert(self->ith_state == MACH_SEND_IN_PROGRESS);
351
352 /*
353 * Thread wakeup-reason field tells us why
354 * the wait was interrupted.
355 */
356
357 switch (save_wait_result) {
358 case THREAD_INTERRUPTED:
359 /* send was interrupted - give up */
360
361 ip_unlock(port);
362 return MACH_SEND_INTERRUPTED;
363
364 case THREAD_TIMED_OUT:
365 /* timeout expired */
366
367 assert(option & MACH_SEND_TIMEOUT);
368 timeout = 0;
369 break;
370
371 case THREAD_RESTART:
372 default:
373 panic("ipc_mqueue_send %d", save_wait_result);
374 }
375 }
376 (void) ipc_mqueue_deliver(port, kmsg, TRUE);
377
378 return MACH_MSG_SUCCESS;
379 }
380
381
382 /*
383 * ipc_mqueue_deliver: give the kmsg to a waiting receiver or else enqueue
384 * it on the port or pset.
385 *
386 * ipc_mqueue_deliver used to be the last block of code in ipc_mqueue_send.
387 * It is split out here so that the same code can be used to deliver DIPC
388 * kmsgs or meta_kmsgs.
389 *
390 * The port must be locked on entry; it will be unlocked on exit. This
391 * routine CAN NOT FAIL when called from thread context. However, from
392 * interrupt context it may not be able to complete its duties due to
393 * lock contention.
394 *
395 * Returns TRUE on successful delivery.
396 */
397 #if TRACE_BUFFER
398 int tr_ipc_mqueue_deliver = 0;
399 #define TR_IPC_MQEN(fmt, port, kmsg, thread_context) \
400 tr_start(); \
401 if (tr_ipc_mqueue_deliver) \
402 tr4((fmt), (port), (kmsg), (thread_context));
403 #define TR_IPC_MQEX(fmt, kmsg) \
404 if (tr_ipc_mqueue_deliver) \
405 tr2((fmt), (kmsg)); \
406 tr_stop();
407 #else /* TRACE_BUFFER */
408 #define TR_IPC_MQEN(fmt, port, kmsg, thread_context)
409 #define TR_IPC_MQEX(fmt, kmsg)
410 #endif /* TRACE_BUFFER */
411
412
413 static void
ipc_mqueue_run(thread_act_t receiver,ipc_mqueue_t mqueue,ipc_kmsg_t kmsg,ipc_port_t port)414 ipc_mqueue_run(thread_act_t receiver, ipc_mqueue_t mqueue, ipc_kmsg_t kmsg, ipc_port_t port)
415 {
416 MPASS(receiver->ith_state == MACH_RCV_IN_PROGRESS ||
417 receiver->ith_state == MACH_RCV_IN_PROGRESS_TIMED);
418 receiver->ith_state = MACH_MSG_SUCCESS;
419 receiver->ith_kmsg = kmsg;
420 receiver->ith_object = (ipc_object_t)port;
421 receiver->ith_seqno = port->ip_seqno++;
422 ip_unlock(port);
423 thread_go(receiver);
424 }
425
426 mach_msg_return_t
ipc_mqueue_deliver(register ipc_port_t port,register ipc_kmsg_t kmsg,boolean_t thread_context)427 ipc_mqueue_deliver(
428 register ipc_port_t port,
429 register ipc_kmsg_t kmsg,
430 boolean_t thread_context)
431 {
432 ipc_mqueue_t mqueue;
433 ipc_pset_t pset;
434 ipc_thread_t receiver;
435 TR_DECL("ipc_mqueue_deliver");
436
437 TR_IPC_MQEN("enter: port 0x%x kmsg 0x%x thd_ctxt %d", port, kmsg,
438 thread_context);
439
440 assert(IP_VALID(port));
441 assert(ip_active(port));
442
443 pset = port->ip_pset;
444 mqueue = &port->ip_messages;
445 receiver = NULL;
446
447 /* first we check the the port and portset for waiters */
448 if (pset != NULL) {
449 ips_lock(pset);
450 receiver = thread_pool_get_act((ipc_object_t)pset, 0);
451 ips_unlock(pset);
452 } else if (receiver == NULL) {
453 receiver = thread_pool_get_act((ipc_object_t)port, 0);
454 }
455 /* we have a receiver - we're done */
456 if (receiver != NULL) {
457 ipc_mqueue_run(receiver, mqueue, kmsg, port);
458 return (MACH_MSG_SUCCESS);
459 }
460
461 assert(port->ip_msgcount >= 0);
462 ipc_kmsg_enqueue_macro(&mqueue->imq_messages, kmsg);
463 port->ip_msgcount++;
464 ip_unlock(port);
465
466 if (pset)
467 ipc_pset_signal(pset);
468
469 TR_IPC_MQEX("exit: wakeup 0x%x", receiver);
470 return MACH_MSG_SUCCESS;
471 }
472
473
474 /*
475 * Routine: ipc_mqueue_copyin
476 * Purpose:
477 * Convert a name in a space to a message queue.
478 * Conditions:
479 * Nothing locked. If successful, the message queue
480 * is returned locked and caller gets a ref for the object.
481 * This ref ensures the continued existence of the queue.
482 * Returns:
483 * MACH_MSG_SUCCESS Found a message queue.
484 * MACH_RCV_INVALID_NAME The space is dead.
485 * MACH_RCV_INVALID_NAME The name doesn't denote a right.
486 * MACH_RCV_INVALID_NAME
487 * The denoted right is not receive or port set.
488 * MACH_RCV_IN_SET Receive right is a member of a set.
489 */
490
491 mach_msg_return_t
ipc_mqueue_copyin(ipc_space_t space,mach_port_name_t name,ipc_entry_bits_t * bitsp,ipc_object_t * objectp)492 ipc_mqueue_copyin(
493 ipc_space_t space,
494 mach_port_name_t name,
495 ipc_entry_bits_t *bitsp,
496 ipc_object_t *objectp)
497 {
498 ipc_entry_t entry;
499 ipc_entry_bits_t bits;
500 ipc_object_t object;
501 mach_msg_return_t mr;
502
503 is_read_lock(space);
504 if (!space->is_active) {
505 mr = MACH_RCV_INVALID_NAME;
506 goto error;
507 }
508
509 entry = ipc_entry_lookup(space, name);
510 if (entry == IE_NULL) {
511 mr = MACH_RCV_INVALID_NAME;
512 goto error;
513 }
514
515 bits = entry->ie_bits;
516 object = entry->ie_object;
517 ipc_object_reference(object);
518
519 if (bits & MACH_PORT_TYPE_RECEIVE) {
520 ipc_port_t port = NULL;
521
522 port = (ipc_port_t) object;
523 assert(port != IP_NULL);
524
525 assert(ip_active(port));
526 assert(port->ip_receiver_name == name);
527 assert(port->ip_receiver == space);
528 is_read_unlock(space);
529 } else if (bits & MACH_PORT_TYPE_PORT_SET) {
530 ipc_pset_t pset;
531
532 pset = (ipc_pset_t) object;
533 assert(pset != IPS_NULL);
534
535 assert(ips_active(pset));
536 assert(pset->ips_local_name == name);
537 is_read_unlock(space);
538 } else {
539 ipc_object_release(object);
540 mr = MACH_RCV_INVALID_NAME;
541 goto error;
542 }
543
544 /*
545 * At this point, the object is locked and active,
546 * the space is unlocked, and mqueue is initialized.
547 */
548
549 *objectp = object;
550 *bitsp = bits;
551 return MACH_MSG_SUCCESS;
552 error:
553 is_read_unlock(space);
554 return (mr);
555 }
556
557
558 static int
ipc_mqueue_receive_error(ipc_thread_t self,int save_wait_result,int option)559 ipc_mqueue_receive_error(ipc_thread_t self, int save_wait_result, int option)
560 {
561 switch (self->ith_state) {
562 case MACH_RCV_PORT_DIED:
563 case MACH_RCV_PORT_CHANGED:
564 /* something bad happened to the port/set */
565 return self->ith_state;
566 case MACH_RCV_IN_PROGRESS:
567 case MACH_RCV_IN_PROGRESS_TIMED:
568 /*
569 * Awakened for other than IPC completion.
570 * Remove ourself from the waiting queue,
571 * then check the wakeup cause.
572 */
573 if (self->ith_active) {
574 thread_pool_remove(self);
575 self->ith_block_lock_data = NULL;
576 self->ith_active = 0;
577 }
578 switch (save_wait_result) {
579 case THREAD_INTERRUPTED:
580 /* receive was interrupted - give up */
581 return MACH_RCV_INTERRUPTED;
582 case THREAD_TIMED_OUT:
583 /* timeout expired */
584 assert(option & MACH_RCV_TIMEOUT);
585 assert(self->ith_state == MACH_RCV_IN_PROGRESS_TIMED);
586 return (MACH_RCV_TIMED_OUT);
587 case THREAD_RESTART:
588 default:
589 panic("ipc_mqueue_receive: bad wait_result");
590 }
591 break;
592
593 default:
594 panic("ipc_mqueue_receive: strange ith_state");
595 }
596 }
597
598 static void
ipc_mqueue_post_on_thread(ipc_port_t port,mach_msg_option_t option,mach_msg_size_t max_size,thread_t thread)599 ipc_mqueue_post_on_thread(
600 ipc_port_t port,
601 mach_msg_option_t option,
602 mach_msg_size_t max_size,
603 thread_t thread)
604 {
605 ipc_kmsg_t kmsg;
606 ipc_mqueue_t mqueue = &port->ip_messages;
607
608 mach_msg_return_t mr = MACH_MSG_SUCCESS;
609 mach_msg_size_t rcv_size;
610 vm_map_t map = current_map();
611
612 /*
613 * Do some sanity checking of our ability to receive
614 * before pulling the message off the queue.
615 */
616 kmsg = ipc_kmsg_queue_first(&mqueue->imq_messages);
617 assert(kmsg != IKM_NULL);
618
619 /*
620 * If we really can't receive it, but we had the
621 * MACH_RCV_LARGE option set, then don't take it off
622 * the queue, instead return the appropriate error
623 * (and size needed).
624 */
625 rcv_size = ipc_kmsg_copyout_size(kmsg, map);
626 if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
627 mr = MACH_RCV_TOO_LARGE;
628 if (option & MACH_RCV_LARGE) {
629 thread->ith_receiver_name = port->ip_receiver_name;
630 thread->ith_kmsg = IKM_NULL;
631 thread->ith_msize = rcv_size;
632 thread->ith_seqno = 0;
633 thread->ith_state = mr;
634 return;
635 }
636 }
637
638 ipc_kmsg_rmqueue_first_macro(&mqueue->imq_messages, kmsg);
639 assert(port->ip_msgcount > 0);
640 port->ip_msgcount--;
641
642 thread->ith_object = (ipc_object_t)port;
643 thread->ith_seqno = port->ip_seqno++;
644 thread->ith_kmsg = kmsg;
645 thread->ith_state = mr;
646
647 current_task()->messages_received++;
648 }
649
650 mach_msg_return_t
ipc_mqueue_pset_receive(natural_t bits,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t timeout,thread_t thread)651 ipc_mqueue_pset_receive(
652 natural_t bits,
653 mach_msg_option_t option,
654 mach_msg_size_t max_size,
655 mach_msg_timeout_t timeout,
656 thread_t thread)
657 {
658 ipc_port_t port;
659 ipc_pset_t pset;
660
661 pset = (ipc_pset_t)thread->ith_object;
662 assert(io_otype(thread->ith_object) == IOT_PORT_SET);
663 restart:
664 TAILQ_FOREACH(port, &pset->ips_ports, ip_next) {
665 mtx_assert(&port->port_comm.rcd_io_lock_data, MA_NOTOWNED);
666 assert (port->ip_msgcount >= 0);
667 if (port->ip_msgcount != 0) {
668 if (ip_lock_try(port) == 0) {
669 ips_unlock(pset);
670 ip_lock(port);
671 ips_lock(pset);
672 }
673 /* one way or another we have the lock */
674 break;
675 }
676 }
677 if (port != NULL && port->ip_msgcount == 0) {
678 mtx_assert(&port->port_comm.rcd_io_lock_data, MA_OWNED);
679 ip_unlock(port);
680 goto restart;
681 }
682 if (port != NULL) {
683 mtx_assert(&port->port_comm.rcd_io_lock_data, MA_OWNED);
684 ipc_mqueue_post_on_thread(port, option, max_size, thread);
685 ip_unlock(port);
686 thread->ith_object = (ipc_object_t)port;
687 return (THREAD_NOT_WAITING);
688 }
689 if ((option & MACH_RCV_TIMEOUT) && (timeout == 0)) {
690 thread->ith_state = MACH_RCV_TIMED_OUT;
691 return (THREAD_NOT_WAITING);
692 }
693
694 return (THREAD_WAITING);
695 }
696
697 /*
698 * Routine: ipc_mqueue_receive
699 * Purpose:
700 * Receive a message from a message queue.
701 *
702 * Conditions:
703 * The message queue is locked; it will be returned unlocked.
704 *
705 * Our caller must hold a reference for the port or port set
706 * to which this queue belongs, to keep the queue
707 * from being deallocated. Furthermore, the port or set
708 * must have been active when the queue was locked.
709 *
710 * The kmsg is returned with clean header fields
711 * and with the circular bit turned off.
712 * Returns:
713 * MACH_MSG_SUCCESS Message returned in kmsgp.
714 * MACH_RCV_TOO_LARGE Message size returned in kmsgp.
715 * MACH_RCV_TIMED_OUT No message obtained.
716 * MACH_RCV_INTERRUPTED No message obtained.
717 * MACH_RCV_PORT_DIED Port/set died; no message.
718 * MACH_RCV_PORT_CHANGED Port moved into set; no msg.
719 *
720 */
721
722
723 mach_msg_return_t
ipc_mqueue_receive(natural_t bits,mach_msg_option_t option,mach_msg_size_t max_size,mach_msg_timeout_t timeout,ipc_kmsg_t * kmsgp,mach_port_seqno_t * seqnop,thread_t thread)724 ipc_mqueue_receive(
725 natural_t bits,
726 mach_msg_option_t option,
727 mach_msg_size_t max_size,
728 mach_msg_timeout_t timeout,
729 ipc_kmsg_t *kmsgp,
730 mach_port_seqno_t *seqnop,
731 thread_t thread)
732 {
733 ipc_port_t port;
734 ipc_pset_t pset;
735 ipc_kmsg_t kmsg;
736 ipc_mqueue_t mqueue;
737 mach_port_seqno_t seqno;
738 mach_msg_return_t mr;
739 ipc_kmsg_queue_t kmsgs;
740 thread_t self;
741 kern_return_t save_wait_result;
742 int rc;
743
744 /* logic currently too confused to support anything else */
745 MPASS(thread == current_thread());
746 MPASS(thread->ith_object != NULL);
747 assert(io_otype(thread->ith_object) == IOT_PORT || io_otype(thread->ith_object) == IOT_PORT_SET);
748 self = thread;
749 pset = NULL;
750
751 io_lock(thread->ith_object);
752 io_reference(thread->ith_object);
753 if (thread->ith_kmsg != NULL) {
754 thread->ith_state = MACH_MSG_SUCCESS;
755 goto rx_done;
756 }
757
758 if (bits & MACH_PORT_TYPE_PORT_SET) {
759 pset = (ipc_pset_t)thread->ith_object;
760
761 rc = ipc_mqueue_pset_receive(bits, option, max_size, timeout, thread);
762 if (rc == THREAD_NOT_WAITING) {
763 if (thread->ith_state == MACH_RCV_TIMED_OUT || thread->ith_state == MACH_RCV_TOO_LARGE) {
764 ips_unlock(pset);
765 ips_release(pset);
766 return (thread->ith_state);
767 } else {
768 kmsg = thread->ith_kmsg;
769 seqno = thread->ith_seqno;
770 MPASS(pset != (ipc_pset_t)thread->ith_object);
771 /* drop passed in pset lock and acquire the port lock */
772 ips_unlock(pset);
773 ips_release(pset);
774 pset = NULL;
775
776 io_lock(thread->ith_object);
777 io_reference(thread->ith_object);
778 goto rx_done;
779 }
780 }
781 assert(io_otype(thread->ith_object) == IOT_PORT_SET);
782 } else {
783 port = (ipc_port_t)thread->ith_object;
784 assert(port->ip_msgcount >= 0);
785 mqueue = &port->ip_messages;
786 kmsgs = &mqueue->imq_messages;
787 kmsg = ipc_kmsg_queue_first(kmsgs);
788 /* a message is already on the queue */
789 if (kmsg != IKM_NULL) {
790 ipc_mqueue_post_on_thread(port, option, max_size, thread);
791 if (thread->ith_state == MACH_MSG_SUCCESS)
792 goto rx_done;
793 else {
794 io_unlock(thread->ith_object);
795 io_release(thread->ith_object);
796 return (thread->ith_state);
797 }
798 }
799 }
800
801 /* must block waiting for a message */
802 if (option & MACH_RCV_TIMEOUT) {
803 if (timeout == 0) {
804 return MACH_RCV_TIMED_OUT;
805 }
806
807 self->ith_state = MACH_RCV_IN_PROGRESS_TIMED;
808 } else {
809 self->ith_state = MACH_RCV_IN_PROGRESS;
810 timeout = 0;
811 }
812 thread_will_wait_with_timeout(self, timeout);
813
814 self->ith_active = 1;
815 self->ith_block_lock_data = &((rpc_common_t)(self->ith_object))->rcd_io_lock_data;
816 thread_pool_put_act(self);
817
818 self->ith_msize = max_size;
819 thread_block();
820 /* Save proper wait_result in case we block */
821 save_wait_result = self->wait_result;
822
823 /* why did we wake up? */
824 if (self->ith_state != MACH_MSG_SUCCESS)
825 goto error;
826
827 if (pset) {
828 ips_unlock(pset);
829 ips_release(pset);
830 io_reference(self->ith_object);
831 io_lock(self->ith_object);
832 }
833
834 rx_done:
835 assert(io_otype(self->ith_object) == IOT_PORT);
836 assert(self->ith_kmsg != NULL);
837 *kmsgp = self->ith_kmsg;
838 *seqnop = self->ith_seqno;
839 port = (ipc_port_t)thread->ith_object;
840
841 assert(io_otype(self->ith_object) == IOT_PORT);
842
843 mr = ipc_mqueue_finish_receive(kmsgp, port, option, max_size);
844
845 io_unlock(self->ith_object);
846 io_release(self->ith_object);
847 self->ith_kmsg = NULL;
848 self->ith_object = NULL;
849 return (mr);
850 error:
851 mr = ipc_mqueue_receive_error(self, save_wait_result, option);
852
853 io_unlock(self->ith_object);
854 io_release(self->ith_object);
855 self->ith_kmsg = NULL;
856 self->ith_object = NULL;
857 return (mr);
858 }
859
860
861 mach_msg_return_t
ipc_mqueue_finish_receive(ipc_kmsg_t * kmsgp,ipc_port_t port,mach_msg_option_t option,mach_msg_size_t max_size)862 ipc_mqueue_finish_receive(
863 ipc_kmsg_t *kmsgp,
864 ipc_port_t port,
865 mach_msg_option_t option,
866 mach_msg_size_t max_size)
867 {
868 ipc_kmsg_t kmsg;
869 mach_msg_return_t mr;
870 ipc_thread_t self = current_thread();
871 mach_msg_size_t rcv_size;
872
873 mr = MACH_MSG_SUCCESS;
874 kmsg = *kmsgp;
875 /* check sizes */
876
877 rcv_size = ipc_kmsg_copyout_size(kmsg, thread_map(self));
878 if (rcv_size + REQUESTED_TRAILER_SIZE(option) > max_size) {
879 /* the receive buffer isn't large enough */
880 if (mach_debug_enable) {
881 printf("%s max_size=%d REQUESTED_TRAILER_SIZE(option=%d)=%d\n",
882 curthread->td_proc->p_comm, max_size, option, REQUESTED_TRAILER_SIZE(option));
883 /* ipc_kmsg_print(kmsg); */
884 printf("rcv_size=%d\n", rcv_size);
885 }
886 mr = MACH_RCV_TOO_LARGE;
887 } else if (self->ith_scatter_list != MACH_MSG_BODY_NULL) {
888 /* verify the scatter list */
889 mr = ipc_kmsg_check_scatter(kmsg,
890 self->ith_option,
891 &self->ith_scatter_list,
892 &self->ith_scatter_list_size);
893 }
894
895 if (mr == MACH_MSG_SUCCESS) {
896 assert((kmsg->ikm_header->msgh_bits & MACH_MSGH_BITS_CIRCULAR)
897 == 0);
898 }
899
900 if (ip_active(port)) {
901 ipc_thread_queue_t senders;
902 ipc_thread_t sender;
903
904 assert(port->ip_msgcount >= 0);
905 senders = &port->ip_blocked;
906 sender = ipc_thread_queue_first(senders);
907
908 MPASS(sender == NULL || sender->ith_state == MACH_SEND_IN_PROGRESS);
909
910 if ((sender != ITH_NULL) &&
911 (port->ip_msgcount < port->ip_qlimit)) {
912 ipc_thread_rmqueue(senders, sender);
913 sender->ith_state = MACH_MSG_SUCCESS;
914 thread_go(sender);
915 }
916 }
917 *kmsgp = kmsg;
918 return mr;
919 }
920