xref: /dragonfly/sys/kern/uipc_msg.c (revision 22db36083bc98aa8a7876ca8e7811e6a33362f59)
1 /*
2  * Copyright (c) 2003, 2004 Jeffrey M. Hsu.  All rights reserved.
3  * Copyright (c) 2003, 2004 The DragonFly Project.  All rights reserved.
4  *
5  * This code is derived from software contributed to The DragonFly Project
6  * by Jeffrey M. Hsu.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the name of The DragonFly Project nor the names of its
17  *    contributors may be used to endorse or promote products derived
18  *    from this software without specific, prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
23  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
24  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
25  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
26  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
27  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
28  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
29  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
30  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
31  * SUCH DAMAGE.
32  */
33 
34 #include <sys/param.h>
35 #include <sys/systm.h>
36 #include <sys/kernel.h>
37 #include <sys/malloc.h>
38 #include <sys/msgport.h>
39 #include <sys/protosw.h>
40 #include <sys/socket.h>
41 #include <sys/socketvar.h>
42 #include <sys/socketops.h>
43 #include <sys/thread.h>
44 #include <sys/msgport2.h>
45 #include <sys/spinlock2.h>
46 #include <sys/sysctl.h>
47 #include <sys/mbuf.h>
48 #include <vm/pmap.h>
49 
50 #include <net/netmsg2.h>
51 #include <net/netisr2.h>
52 #include <sys/socketvar2.h>
53 
54 #include <net/netisr.h>
55 #include <net/netmsg.h>
56 
57 static int async_rcvd_drop_race = 0;
58 SYSCTL_INT(_kern_ipc, OID_AUTO, async_rcvd_drop_race, CTLFLAG_RW,
59     &async_rcvd_drop_race, 0, "# of asynchronized pru_rcvd msg drop races");
60 
61 /*
62  * Abort a socket and free it, asynchronously.  Called from
63  * soabort_async() only.  soabort_async() got a ref on the
64  * socket which we must free on reply.
65  */
66 void
so_pru_abort_async(struct socket * so)67 so_pru_abort_async(struct socket *so)
68 {
69           struct netmsg_pru_abort *msg;
70 
71           msg = kmalloc(sizeof(*msg), M_LWKTMSG, M_WAITOK | M_ZERO);
72           netmsg_init(&msg->base, so, &netisr_afree_free_so_rport,
73                         0, so->so_proto->pr_usrreqs->pru_abort);
74           lwkt_sendmsg(so->so_port, &msg->base.lmsg);
75 }
76 
77 /*
78  * Abort a socket and free it.  Called from soabort_direct() only.
79  * Caller must make sure that the current CPU is inpcb's owner CPU.
80  * soabort_direct() got a ref on the socket which we must free.
81  */
82 void
so_pru_abort_direct(struct socket * so)83 so_pru_abort_direct(struct socket *so)
84 {
85           struct netmsg_pru_abort msg;
86           netisr_fn_t func = so->so_proto->pr_usrreqs->pru_abort;
87 
88           netmsg_init(&msg.base, so, &netisr_adone_rport, 0, func);
89           msg.base.lmsg.ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
90           msg.base.lmsg.ms_flags |= MSGF_SYNC;
91           func((netmsg_t)&msg);
92           KKASSERT(msg.base.lmsg.ms_flags & MSGF_DONE);
93           sofree(msg.base.nm_so);
94 }
95 
96 int
so_pru_accept(struct socket * so,struct sockaddr ** nam)97 so_pru_accept(struct socket *so, struct sockaddr **nam)
98 {
99           struct netmsg_pru_accept msg;
100 
101           netmsg_init(&msg.base, so, &curthread->td_msgport,
102               0, so->so_proto->pr_usrreqs->pru_accept);
103           msg.nm_nam = nam;
104 
105           return lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
106 }
107 
108 int
so_pru_attach(struct socket * so,int proto,struct pru_attach_info * ai)109 so_pru_attach(struct socket *so, int proto, struct pru_attach_info *ai)
110 {
111           struct netmsg_pru_attach msg;
112           int error;
113 
114           netmsg_init(&msg.base, so, &curthread->td_msgport,
115                         0, so->so_proto->pr_usrreqs->pru_attach);
116           msg.nm_proto = proto;
117           msg.nm_ai = ai;
118           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
119           return (error);
120 }
121 
122 int
so_pru_attach_direct(struct socket * so,int proto,struct pru_attach_info * ai)123 so_pru_attach_direct(struct socket *so, int proto, struct pru_attach_info *ai)
124 {
125           struct netmsg_pru_attach msg;
126           netisr_fn_t func = so->so_proto->pr_usrreqs->pru_attach;
127 
128           netmsg_init(&msg.base, so, &netisr_adone_rport, 0, func);
129           msg.base.lmsg.ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
130           msg.base.lmsg.ms_flags |= MSGF_SYNC;
131           msg.nm_proto = proto;
132           msg.nm_ai = ai;
133           func((netmsg_t)&msg);
134           KKASSERT(msg.base.lmsg.ms_flags & MSGF_DONE);
135           return(msg.base.lmsg.ms_error);
136 }
137 
138 int
so_pru_attach_fast(struct socket * so,int proto,struct pru_attach_info * ai)139 so_pru_attach_fast(struct socket *so, int proto, struct pru_attach_info *ai)
140 {
141           struct netmsg_pru_attach *msg;
142           int error;
143 
144           error = so->so_proto->pr_usrreqs->pru_preattach(so, proto, ai);
145           if (error)
146                     return error;
147 
148           msg = kmalloc(sizeof(*msg), M_LWKTMSG, M_WAITOK | M_NULLOK);
149           if (msg == NULL) {
150                     /*
151                      * Fail to allocate message; fallback to
152                      * synchronized pru_attach.
153                      */
154                     return so_pru_attach(so, proto, NULL /* postattach */);
155           }
156 
157           netmsg_init(&msg->base, so, &netisr_afree_rport, 0,
158               so->so_proto->pr_usrreqs->pru_attach);
159           msg->nm_proto = proto;
160           msg->nm_ai = NULL; /* postattach */
161           if (so->so_port == netisr_curport())
162                     lwkt_sendmsg_oncpu(so->so_port, &msg->base.lmsg);
163           else
164                     lwkt_sendmsg(so->so_port, &msg->base.lmsg);
165 
166           return 0;
167 }
168 
169 /*
170  * NOTE: If the target port changes the bind operation will deal with it.
171  */
172 int
so_pru_bind(struct socket * so,struct sockaddr * nam,struct thread * td)173 so_pru_bind(struct socket *so, struct sockaddr *nam, struct thread *td)
174 {
175           struct netmsg_pru_bind msg;
176           int error;
177 
178           netmsg_init(&msg.base, so, &curthread->td_msgport,
179                         0, so->so_proto->pr_usrreqs->pru_bind);
180           msg.nm_nam = nam;
181           msg.nm_td = td;               /* used only for prison_ip() */
182           msg.nm_flags = 0;
183           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
184           return (error);
185 }
186 
187 int
so_pru_connect(struct socket * so,struct sockaddr * nam,struct thread * td)188 so_pru_connect(struct socket *so, struct sockaddr *nam, struct thread *td)
189 {
190           struct netmsg_pru_connect msg;
191           int error;
192 
193           netmsg_init(&msg.base, so, &curthread->td_msgport,
194                         0, so->so_proto->pr_usrreqs->pru_connect);
195           msg.nm_nam = nam;
196           msg.nm_td = td;
197           msg.nm_m = NULL;
198           msg.nm_sndflags = 0;
199           msg.nm_flags = 0;
200           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
201           return (error);
202 }
203 
204 int
so_pru_connect_async(struct socket * so,struct sockaddr * nam,struct thread * td)205 so_pru_connect_async(struct socket *so, struct sockaddr *nam, struct thread *td)
206 {
207           struct netmsg_pru_connect *msg;
208           int error, flags;
209 
210           KASSERT(so->so_proto->pr_usrreqs->pru_preconnect != NULL,
211               ("async pru_connect is not supported"));
212 
213           /* NOTE: sockaddr immediately follows netmsg */
214           msg = kmalloc(sizeof(*msg) + nam->sa_len, M_LWKTMSG,
215               M_WAITOK | M_NULLOK);
216           if (msg == NULL) {
217                     /*
218                      * Fail to allocate message; fallback to
219                      * synchronized pru_connect.
220                      */
221                     return so_pru_connect(so, nam, td);
222           }
223 
224           error = so->so_proto->pr_usrreqs->pru_preconnect(so, nam, td);
225           if (error) {
226                     kfree(msg, M_LWKTMSG);
227                     return error;
228           }
229 
230           flags = PRUC_ASYNC;
231           if (td != NULL && (so->so_proto->pr_flags & PR_ACONN_HOLDTD)) {
232                     lwkt_hold(td);
233                     flags |= PRUC_HELDTD;
234           }
235 
236           netmsg_init(&msg->base, so, &netisr_afree_rport, 0,
237               so->so_proto->pr_usrreqs->pru_connect);
238           msg->nm_nam = (struct sockaddr *)(msg + 1);
239           memcpy(msg->nm_nam, nam, nam->sa_len);
240           msg->nm_td = td;
241           msg->nm_m = NULL;
242           msg->nm_sndflags = 0;
243           msg->nm_flags = flags;
244           if (so->so_port == netisr_curport())
245                     lwkt_sendmsg_oncpu(so->so_port, &msg->base.lmsg);
246           else
247                     lwkt_sendmsg(so->so_port, &msg->base.lmsg);
248           return 0;
249 }
250 
251 int
so_pru_connect2(struct socket * so1,struct socket * so2,struct ucred * cred)252 so_pru_connect2(struct socket *so1, struct socket *so2, struct ucred *cred)
253 {
254           struct netmsg_pru_connect2 msg;
255           int error;
256 
257           netmsg_init(&msg.base, so1, &curthread->td_msgport,
258                         0, so1->so_proto->pr_usrreqs->pru_connect2);
259           msg.nm_so1 = so1;
260           msg.nm_so2 = so2;
261           msg.nm_cred = cred;
262           error = lwkt_domsg(so1->so_port, &msg.base.lmsg, 0);
263           return (error);
264 }
265 
266 /*
267  * WARNING!  Synchronous call from user context.  Control function may do
268  *             copyin/copyout.
269  */
270 int
so_pru_control_direct(struct socket * so,u_long cmd,caddr_t data,struct ifnet * ifp)271 so_pru_control_direct(struct socket *so, u_long cmd, caddr_t data,
272                           struct ifnet *ifp)
273 {
274           struct netmsg_pru_control msg;
275           netisr_fn_t func = so->so_proto->pr_usrreqs->pru_control;
276 
277           netmsg_init(&msg.base, so, &netisr_adone_rport, 0, func);
278           msg.base.lmsg.ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
279           msg.base.lmsg.ms_flags |= MSGF_SYNC;
280           msg.nm_cmd = cmd;
281           msg.nm_data = data;
282           msg.nm_ifp = ifp;
283           msg.nm_td = curthread;
284           func((netmsg_t)&msg);
285           KKASSERT(msg.base.lmsg.ms_flags & MSGF_DONE);
286           return(msg.base.lmsg.ms_error);
287 }
288 
289 int
so_pru_detach(struct socket * so)290 so_pru_detach(struct socket *so)
291 {
292           struct netmsg_pru_detach msg;
293           int error;
294 
295           netmsg_init(&msg.base, so, &curthread->td_msgport,
296                         0, so->so_proto->pr_usrreqs->pru_detach);
297           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
298           return (error);
299 }
300 
301 int
so_pru_detach_direct(struct socket * so)302 so_pru_detach_direct(struct socket *so)
303 {
304           struct netmsg_pru_detach msg;
305           netisr_fn_t func = so->so_proto->pr_usrreqs->pru_detach;
306 
307           netmsg_init(&msg.base, so, &netisr_adone_rport, 0, func);
308           msg.base.lmsg.ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
309           msg.base.lmsg.ms_flags |= MSGF_SYNC;
310           func((netmsg_t)&msg);
311           KKASSERT(msg.base.lmsg.ms_flags & MSGF_DONE);
312           return(msg.base.lmsg.ms_error);
313 }
314 
315 int
so_pru_disconnect(struct socket * so)316 so_pru_disconnect(struct socket *so)
317 {
318           struct netmsg_pru_disconnect msg;
319           int error;
320 
321           netmsg_init(&msg.base, so, &curthread->td_msgport,
322                         0, so->so_proto->pr_usrreqs->pru_disconnect);
323           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
324           return (error);
325 }
326 
327 void
so_pru_disconnect_direct(struct socket * so)328 so_pru_disconnect_direct(struct socket *so)
329 {
330           struct netmsg_pru_disconnect msg;
331           netisr_fn_t func = so->so_proto->pr_usrreqs->pru_disconnect;
332 
333           netmsg_init(&msg.base, so, &netisr_adone_rport, 0, func);
334           msg.base.lmsg.ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
335           msg.base.lmsg.ms_flags |= MSGF_SYNC;
336           func((netmsg_t)&msg);
337           KKASSERT(msg.base.lmsg.ms_flags & MSGF_DONE);
338 }
339 
340 int
so_pru_listen(struct socket * so,struct thread * td)341 so_pru_listen(struct socket *so, struct thread *td)
342 {
343           struct netmsg_pru_listen msg;
344           int error;
345 
346           netmsg_init(&msg.base, so, &curthread->td_msgport,
347                         0, so->so_proto->pr_usrreqs->pru_listen);
348           msg.nm_td = td;               /* used only for prison_ip() XXX JH */
349           msg.nm_flags = 0;
350           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
351           return (error);
352 }
353 
354 int
so_pru_peeraddr(struct socket * so,struct sockaddr ** nam)355 so_pru_peeraddr(struct socket *so, struct sockaddr **nam)
356 {
357           struct netmsg_pru_peeraddr msg;
358           int error;
359 
360           netmsg_init(&msg.base, so, &curthread->td_msgport,
361                         0, so->so_proto->pr_usrreqs->pru_peeraddr);
362           msg.nm_nam = nam;
363           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
364           return (error);
365 }
366 
367 int
so_pru_rcvd(struct socket * so,int flags)368 so_pru_rcvd(struct socket *so, int flags)
369 {
370           struct netmsg_pru_rcvd msg;
371           int error;
372 
373           netmsg_init(&msg.base, so, &curthread->td_msgport,
374                         0, so->so_proto->pr_usrreqs->pru_rcvd);
375           msg.nm_flags = flags;
376           msg.nm_pru_flags = 0;
377           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
378           return (error);
379 }
380 
381 void
so_pru_rcvd_async(struct socket * so)382 so_pru_rcvd_async(struct socket *so)
383 {
384           lwkt_msg_t lmsg = &so->so_rcvd_msg.base.lmsg;
385 
386           KASSERT(so->so_proto->pr_flags & PR_ASYNC_RCVD,
387               ("async pru_rcvd is not supported"));
388 
389           /*
390            * WARNING!  Spinlock is a bit dodgy, use hacked up sendmsg
391            *             to avoid deadlocking.
392            */
393           spin_lock(&so->so_rcvd_spin);
394           if ((so->so_rcvd_msg.nm_pru_flags & PRUR_DEAD) == 0) {
395                     if (lmsg->ms_flags & MSGF_DONE) {
396                               lwkt_sendmsg_prepare(so->so_port, lmsg);
397                               spin_unlock(&so->so_rcvd_spin);
398                               if (so->so_port == netisr_curport())
399                                         lwkt_sendmsg_start_oncpu(so->so_port, lmsg);
400                               else
401                                         lwkt_sendmsg_start(so->so_port, lmsg);
402                     } else {
403                               spin_unlock(&so->so_rcvd_spin);
404                     }
405           } else {
406                     spin_unlock(&so->so_rcvd_spin);
407           }
408 }
409 
410 int
so_pru_rcvoob(struct socket * so,struct mbuf * m,int flags)411 so_pru_rcvoob(struct socket *so, struct mbuf *m, int flags)
412 {
413           struct netmsg_pru_rcvoob msg;
414           int error;
415 
416           netmsg_init(&msg.base, so, &curthread->td_msgport,
417                         0, so->so_proto->pr_usrreqs->pru_rcvoob);
418           msg.nm_m = m;
419           msg.nm_flags = flags;
420           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
421           return (error);
422 }
423 
424 /*
425  * NOTE: If the target port changes the implied connect will deal with it.
426  */
427 int
so_pru_send(struct socket * so,int flags,struct mbuf * m,struct sockaddr * addr,struct mbuf * control,struct thread * td)428 so_pru_send(struct socket *so, int flags, struct mbuf *m,
429               struct sockaddr *addr, struct mbuf *control, struct thread *td)
430 {
431           struct netmsg_pru_send msg;
432           int error;
433 
434           netmsg_init(&msg.base, so, &curthread->td_msgport,
435                         0, so->so_proto->pr_usrreqs->pru_send);
436           msg.nm_flags = flags;
437           msg.nm_m = m;
438           msg.nm_addr = addr;
439           msg.nm_control = control;
440           msg.nm_td = td;
441           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
442           return (error);
443 }
444 
445 void
so_pru_sync(struct socket * so)446 so_pru_sync(struct socket *so)
447 {
448           struct netmsg_base msg;
449 
450           netmsg_init(&msg, so, &curthread->td_msgport, 0,
451               netmsg_sync_handler);
452           lwkt_domsg(so->so_port, &msg.lmsg, 0);
453 }
454 
455 void
so_pru_send_async(struct socket * so,int flags,struct mbuf * m,struct sockaddr * addr0,struct mbuf * control,struct thread * td)456 so_pru_send_async(struct socket *so, int flags, struct mbuf *m,
457     struct sockaddr *addr0, struct mbuf *control, struct thread *td)
458 {
459           struct netmsg_pru_send *msg;
460           struct sockaddr *addr = NULL;
461 
462           KASSERT(so->so_proto->pr_flags & PR_ASYNC_SEND,
463               ("async pru_send is not supported"));
464 
465           if (addr0 != NULL) {
466                     addr = kmalloc(addr0->sa_len, M_SONAME, M_WAITOK | M_NULLOK);
467                     if (addr == NULL) {
468                               /*
469                                * Fail to allocate address; fallback to
470                                * synchronized pru_send.
471                                */
472                               so_pru_send(so, flags, m, addr0, control, td);
473                               return;
474                     }
475                     memcpy(addr, addr0, addr0->sa_len);
476                     flags |= PRUS_FREEADDR;
477           }
478           flags |= PRUS_NOREPLY;
479 
480           if (td != NULL && (so->so_proto->pr_flags & PR_ASEND_HOLDTD)) {
481                     lwkt_hold(td);
482                     flags |= PRUS_HELDTD;
483           }
484 
485           msg = &m->m_hdr.mh_sndmsg;
486           netmsg_init(&msg->base, so, &netisr_apanic_rport,
487                         0, so->so_proto->pr_usrreqs->pru_send);
488           msg->nm_flags = flags;
489           msg->nm_m = m;
490           msg->nm_addr = addr;
491           msg->nm_control = control;
492           msg->nm_td = td;
493           if (so->so_port == netisr_curport())
494                     lwkt_sendmsg_oncpu(so->so_port, &msg->base.lmsg);
495           else
496                     lwkt_sendmsg(so->so_port, &msg->base.lmsg);
497 }
498 
499 int
so_pru_sense(struct socket * so,struct stat * sb)500 so_pru_sense(struct socket *so, struct stat *sb)
501 {
502           struct netmsg_pru_sense msg;
503           int error;
504 
505           netmsg_init(&msg.base, so, &curthread->td_msgport,
506                         0, so->so_proto->pr_usrreqs->pru_sense);
507           msg.nm_stat = sb;
508           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
509           return (error);
510 }
511 
512 int
so_pru_shutdown(struct socket * so)513 so_pru_shutdown(struct socket *so)
514 {
515           struct netmsg_pru_shutdown msg;
516           int error;
517 
518           netmsg_init(&msg.base, so, &curthread->td_msgport,
519                         0, so->so_proto->pr_usrreqs->pru_shutdown);
520           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
521           return (error);
522 }
523 
524 int
so_pru_sockaddr(struct socket * so,struct sockaddr ** nam)525 so_pru_sockaddr(struct socket *so, struct sockaddr **nam)
526 {
527           struct netmsg_pru_sockaddr msg;
528           int error;
529 
530           netmsg_init(&msg.base, so, &curthread->td_msgport,
531                         0, so->so_proto->pr_usrreqs->pru_sockaddr);
532           msg.nm_nam = nam;
533           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
534           return (error);
535 }
536 
537 int
so_pr_ctloutput(struct socket * so,struct sockopt * sopt)538 so_pr_ctloutput(struct socket *so, struct sockopt *sopt)
539 {
540           struct netmsg_pr_ctloutput msg;
541           int error;
542 
543           KKASSERT(!sopt->sopt_val || kva_p(sopt->sopt_val));
544 
545           if (sopt->sopt_dir == SOPT_SET && so->so_proto->pr_ctloutmsg != NULL) {
546                     struct netmsg_pr_ctloutput *amsg;
547 
548                     /* Fast path: asynchronous pr_ctloutput */
549                     amsg = so->so_proto->pr_ctloutmsg(sopt);
550                     if (amsg != NULL) {
551                               netmsg_init(&amsg->base, so, &netisr_afree_rport, 0,
552                                   so->so_proto->pr_ctloutput);
553                               /* nm_flags and nm_sopt are setup by pr_ctloutmsg */
554                               if (so->so_port == netisr_curport()) {
555                                         lwkt_sendmsg_oncpu(so->so_port,
556                                             &amsg->base.lmsg);
557                               } else {
558                                         lwkt_sendmsg(so->so_port, &amsg->base.lmsg);
559                               }
560                               return 0;
561                     }
562                     /* FALLTHROUGH */
563           }
564 
565           netmsg_init(&msg.base, so, &curthread->td_msgport,
566                         0, so->so_proto->pr_ctloutput);
567           msg.nm_flags = 0;
568           msg.nm_sopt = sopt;
569           error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
570           return (error);
571 }
572 
573 struct lwkt_port *
so_pr_ctlport(struct protosw * pr,int cmd,struct sockaddr * arg,void * extra,int * cpuid)574 so_pr_ctlport(struct protosw *pr, int cmd, struct sockaddr *arg,
575     void *extra, int *cpuid)
576 {
577           if (pr->pr_ctlport == NULL)
578                     return NULL;
579           KKASSERT(pr->pr_ctlinput != NULL);
580 
581           return pr->pr_ctlport(cmd, arg, extra, cpuid);
582 }
583 
584 /*
585  * Protocol control input, typically via icmp.
586  *
587  * If the protocol pr_ctlport is not NULL we call it to figure out the
588  * protocol port.  If NULL is returned we can just return, otherwise
589  * we issue a netmsg to call pr_ctlinput in the proper thread.
590  *
591  * This must be done synchronously as arg and/or extra may point to
592  * temporary data.
593  */
594 void
so_pr_ctlinput(struct protosw * pr,int cmd,struct sockaddr * arg,void * extra)595 so_pr_ctlinput(struct protosw *pr, int cmd, struct sockaddr *arg, void *extra)
596 {
597           struct netmsg_pr_ctlinput msg;
598           lwkt_port_t port;
599           int cpuid;
600 
601           port = so_pr_ctlport(pr, cmd, arg, extra, &cpuid);
602           if (port == NULL)
603                     return;
604           netmsg_init(&msg.base, NULL, &curthread->td_msgport,
605                         0, pr->pr_ctlinput);
606           msg.nm_cmd = cmd;
607           msg.nm_direct = 0;
608           msg.nm_arg = arg;
609           msg.nm_extra = extra;
610           lwkt_domsg(port, &msg.base.lmsg, 0);
611 }
612 
613 void
so_pr_ctlinput_direct(struct protosw * pr,int cmd,struct sockaddr * arg,void * extra)614 so_pr_ctlinput_direct(struct protosw *pr, int cmd, struct sockaddr *arg,
615     void *extra)
616 {
617           struct netmsg_pr_ctlinput msg;
618           netisr_fn_t func;
619           lwkt_port_t port;
620           int cpuid;
621 
622           port = so_pr_ctlport(pr, cmd, arg, extra, &cpuid);
623           if (port == NULL)
624                     return;
625           if (cpuid != netisr_ncpus && cpuid != mycpuid)
626                     return;
627 
628           func = pr->pr_ctlinput;
629           netmsg_init(&msg.base, NULL, &netisr_adone_rport, 0, func);
630           msg.base.lmsg.ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
631           msg.base.lmsg.ms_flags |= MSGF_SYNC;
632           msg.nm_cmd = cmd;
633           msg.nm_direct = 1;
634           msg.nm_arg = arg;
635           msg.nm_extra = extra;
636           func((netmsg_t)&msg);
637           KKASSERT(msg.base.lmsg.ms_flags & MSGF_DONE);
638 }
639 
640 /*
641  * If we convert all the protosw pr_ functions for all the protocols
642  * to take a message directly, this layer can go away.  For the moment
643  * our dispatcher ignores the return value, but since we are handling
644  * the replymsg ourselves we return EASYNC by convention.
645  */
646 
647 /*
648  * Handle a predicate event request.  This function is only called once
649  * when the predicate message queueing request is received.
650  */
651 void
netmsg_so_notify(netmsg_t msg)652 netmsg_so_notify(netmsg_t msg)
653 {
654           struct socket *so = msg->base.nm_so;
655           struct signalsockbuf *ssb;
656 
657           ssb = (msg->notify.nm_etype & NM_REVENT) ? &so->so_rcv : &so->so_snd;
658 
659           /*
660            * Reply immediately if the event has occured, otherwise queue the
661            * request.
662            *
663            * NOTE: Socket can change if this is an accept predicate so cache
664            *         the token.
665            */
666           lwkt_getpooltoken(so);
667           atomic_set_int(&ssb->ssb_flags, SSB_MEVENT);
668           if (msg->notify.nm_predicate(&msg->notify)) {
669                     if (TAILQ_EMPTY(&ssb->ssb_mlist))
670                               atomic_clear_int(&ssb->ssb_flags, SSB_MEVENT);
671                     lwkt_relpooltoken(so);
672                     lwkt_replymsg(&msg->base.lmsg,
673                                     msg->base.lmsg.ms_error);
674           } else {
675                     TAILQ_INSERT_TAIL(&ssb->ssb_mlist, &msg->notify, nm_list);
676                     /*
677                      * NOTE:
678                      * If predict ever blocks, 'tok' will be released, so
679                      * SSB_MEVENT set beforehand could have been cleared
680                      * when we reach here.  In case that happens, we set
681                      * SSB_MEVENT again, after the notify has been queued.
682                      */
683                     atomic_set_int(&ssb->ssb_flags, SSB_MEVENT);
684                     lwkt_relpooltoken(so);
685           }
686 }
687 
688 /*
689  * Called by doio when trying to abort a netmsg_so_notify message.
690  * Unlike the other functions this one is dispatched directly by
691  * the LWKT subsystem, so it takes a lwkt_msg_t as an argument.
692  *
693  * The original message, lmsg, is under the control of the caller and
694  * will not be destroyed until we return so we can safely reference it
695  * in our synchronous abort request.
696  *
697  * This part of the abort request occurs on the originating cpu which
698  * means we may race the message flags and the original message may
699  * not even have been processed by the target cpu yet.
700  */
701 void
netmsg_so_notify_doabort(lwkt_msg_t lmsg)702 netmsg_so_notify_doabort(lwkt_msg_t lmsg)
703 {
704           struct netmsg_so_notify_abort msg;
705 
706           if ((lmsg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0) {
707                     const struct netmsg_base *nmsg =
708                         (const struct netmsg_base *)lmsg;
709 
710                     netmsg_init(&msg.base, nmsg->nm_so, &curthread->td_msgport,
711                                   0, netmsg_so_notify_abort);
712                     msg.nm_notifymsg = (void *)lmsg;
713                     lwkt_domsg(lmsg->ms_target_port, &msg.base.lmsg, 0);
714           }
715 }
716 
717 /*
718  * Predicate requests can be aborted.  This function is only called once
719  * and will interlock against processing/reply races (since such races
720  * occur on the same thread that controls the port where the abort is
721  * requeued).
722  *
723  * This part of the abort request occurs on the target cpu.  The message
724  * flags must be tested again in case the test that we did on the
725  * originating cpu raced.  Since messages are handled in sequence, the
726  * original message will have already been handled by the loop and either
727  * replied to or queued.
728  *
729  * We really only need to interlock with MSGF_REPLY (a bit that is set on
730  * our cpu when we reply).  Note that MSGF_DONE is not set until the
731  * reply reaches the originating cpu.  Test both bits anyway.
732  */
733 void
netmsg_so_notify_abort(netmsg_t msg)734 netmsg_so_notify_abort(netmsg_t msg)
735 {
736           struct netmsg_so_notify_abort *abrtmsg = &msg->notify_abort;
737           struct netmsg_so_notify *nmsg = abrtmsg->nm_notifymsg;
738           struct signalsockbuf *ssb;
739 
740           /*
741            * The original notify message is not destroyed until after the
742            * abort request is returned, so we can check its state.
743            */
744           lwkt_getpooltoken(nmsg->base.nm_so);
745           if ((nmsg->base.lmsg.ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0) {
746                     ssb = (nmsg->nm_etype & NM_REVENT) ?
747                                         &nmsg->base.nm_so->so_rcv :
748                                         &nmsg->base.nm_so->so_snd;
749                     TAILQ_REMOVE(&ssb->ssb_mlist, nmsg, nm_list);
750                     lwkt_relpooltoken(nmsg->base.nm_so);
751                     lwkt_replymsg(&nmsg->base.lmsg, EINTR);
752           } else {
753                     lwkt_relpooltoken(nmsg->base.nm_so);
754           }
755 
756           /*
757            * Reply to the abort message
758            */
759           lwkt_replymsg(&abrtmsg->base.lmsg, 0);
760 }
761 
762 void
so_async_rcvd_reply(struct socket * so)763 so_async_rcvd_reply(struct socket *so)
764 {
765           /*
766            * Spinlock safe, reply runs to degenerate lwkt_null_replyport()
767            */
768           spin_lock(&so->so_rcvd_spin);
769           lwkt_replymsg(&so->so_rcvd_msg.base.lmsg, 0);
770           spin_unlock(&so->so_rcvd_spin);
771 }
772 
773 void
so_async_rcvd_drop(struct socket * so)774 so_async_rcvd_drop(struct socket *so)
775 {
776           lwkt_msg_t lmsg = &so->so_rcvd_msg.base.lmsg;
777 
778           /*
779            * Spinlock safe, drop runs to degenerate lwkt_spin_dropmsg()
780            */
781           spin_lock(&so->so_rcvd_spin);
782           so->so_rcvd_msg.nm_pru_flags |= PRUR_DEAD;
783 again:
784           lwkt_dropmsg(lmsg);
785           if ((lmsg->ms_flags & MSGF_DONE) == 0) {
786                     ++async_rcvd_drop_race;
787                     ssleep(so, &so->so_rcvd_spin, 0, "soadrop", 1);
788                     goto again;
789           }
790           spin_unlock(&so->so_rcvd_spin);
791 }
792