1 /* $OpenBSD: sys_pipe.c,v 1.46 2004/01/06 04:18:18 tedu Exp $ */
2
3 /*
4 * Copyright (c) 1996 John S. Dyson
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice immediately at the beginning of the file, without modification,
12 * this list of conditions, and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Absolutely no warranty of function or purpose is made by the author
17 * John S. Dyson.
18 * 4. Modifications may be freely made to this file if the above conditions
19 * are met.
20 */
21
22 /*
23 * This file contains a high-performance replacement for the socket-based
24 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
25 * all features of sockets, but does do everything that pipes normally
26 * do.
27 */
28
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/proc.h>
32 #include <sys/file.h>
33 #include <sys/filedesc.h>
34 #include <sys/pool.h>
35 #include <sys/ioctl.h>
36 #include <sys/stat.h>
37 #include <sys/signalvar.h>
38 #include <sys/mount.h>
39 #include <sys/syscallargs.h>
40 #include <sys/event.h>
41 #include <sys/lock.h>
42 #include <sys/poll.h>
43
44 #include <uvm/uvm_extern.h>
45
46 #include <sys/pipe.h>
47
48 /*
49 * interfaces to the outside world
50 */
51 int pipe_read(struct file *, off_t *, struct uio *, struct ucred *);
52 int pipe_write(struct file *, off_t *, struct uio *, struct ucred *);
53 int pipe_close(struct file *, struct proc *);
54 int pipe_poll(struct file *, int events, struct proc *);
55 int pipe_kqfilter(struct file *fp, struct knote *kn);
56 int pipe_ioctl(struct file *, u_long, caddr_t, struct proc *);
57 int pipe_stat(struct file *fp, struct stat *ub, struct proc *p);
58
59 static struct fileops pipeops = {
60 pipe_read, pipe_write, pipe_ioctl, pipe_poll, pipe_kqfilter,
61 pipe_stat, pipe_close
62 };
63
64 void filt_pipedetach(struct knote *kn);
65 int filt_piperead(struct knote *kn, long hint);
66 int filt_pipewrite(struct knote *kn, long hint);
67
68 struct filterops pipe_rfiltops =
69 { 1, NULL, filt_pipedetach, filt_piperead };
70 struct filterops pipe_wfiltops =
71 { 1, NULL, filt_pipedetach, filt_pipewrite };
72
73 /*
74 * Default pipe buffer size(s), this can be kind-of large now because pipe
75 * space is pageable. The pipe code will try to maintain locality of
76 * reference for performance reasons, so small amounts of outstanding I/O
77 * will not wipe the cache.
78 */
79 #define MINPIPESIZE (PIPE_SIZE/3)
80
81 /*
82 * Limit the number of "big" pipes
83 */
84 #define LIMITBIGPIPES 32
85 int nbigpipe;
86 static int amountpipekva;
87
88 struct pool pipe_pool;
89
90 void pipeclose(struct pipe *);
91 void pipe_free_kmem(struct pipe *);
92 int pipe_create(struct pipe *);
93 static __inline int pipelock(struct pipe *);
94 static __inline void pipeunlock(struct pipe *);
95 static __inline void pipeselwakeup(struct pipe *);
96 int pipespace(struct pipe *, u_int);
97
98 /*
99 * The pipe system call for the DTYPE_PIPE type of pipes
100 */
101
102 /* ARGSUSED */
103 int
sys_opipe(p,v,retval)104 sys_opipe(p, v, retval)
105 struct proc *p;
106 void *v;
107 register_t *retval;
108 {
109 struct filedesc *fdp = p->p_fd;
110 struct file *rf, *wf;
111 struct pipe *rpipe, *wpipe;
112 int fd, error;
113
114 fdplock(fdp, p);
115
116 rpipe = pool_get(&pipe_pool, PR_WAITOK);
117 error = pipe_create(rpipe);
118 if (error != 0)
119 goto free1;
120 wpipe = pool_get(&pipe_pool, PR_WAITOK);
121 error = pipe_create(wpipe);
122 if (error != 0)
123 goto free2;
124
125 error = falloc(p, &rf, &fd);
126 if (error != 0)
127 goto free2;
128 rf->f_flag = FREAD | FWRITE;
129 rf->f_type = DTYPE_PIPE;
130 rf->f_data = rpipe;
131 rf->f_ops = &pipeops;
132 retval[0] = fd;
133
134 error = falloc(p, &wf, &fd);
135 if (error != 0)
136 goto free3;
137 wf->f_flag = FREAD | FWRITE;
138 wf->f_type = DTYPE_PIPE;
139 wf->f_data = wpipe;
140 wf->f_ops = &pipeops;
141 retval[1] = fd;
142
143 rpipe->pipe_peer = wpipe;
144 wpipe->pipe_peer = rpipe;
145
146 FILE_SET_MATURE(rf);
147 FILE_SET_MATURE(wf);
148
149 fdpunlock(fdp);
150 return (0);
151
152 free3:
153 fdremove(fdp, retval[0]);
154 closef(rf, p);
155 rpipe = NULL;
156 free2:
157 (void)pipeclose(wpipe);
158 free1:
159 if (rpipe != NULL)
160 (void)pipeclose(rpipe);
161 fdpunlock(fdp);
162 return (error);
163 }
164
165 /*
166 * Allocate kva for pipe circular buffer, the space is pageable
167 * This routine will 'realloc' the size of a pipe safely, if it fails
168 * it will retain the old buffer.
169 * If it fails it will return ENOMEM.
170 */
171 int
pipespace(cpipe,size)172 pipespace(cpipe, size)
173 struct pipe *cpipe;
174 u_int size;
175 {
176 caddr_t buffer;
177
178 buffer = (caddr_t)uvm_km_valloc(kernel_map, size);
179 if (buffer == NULL) {
180 return (ENOMEM);
181 }
182
183 /* free old resources if we are resizing */
184 pipe_free_kmem(cpipe);
185 cpipe->pipe_buffer.buffer = buffer;
186 cpipe->pipe_buffer.size = size;
187 cpipe->pipe_buffer.in = 0;
188 cpipe->pipe_buffer.out = 0;
189 cpipe->pipe_buffer.cnt = 0;
190
191 amountpipekva += cpipe->pipe_buffer.size;
192
193 return (0);
194 }
195
196 /*
197 * initialize and allocate VM and memory for pipe
198 */
199 int
pipe_create(cpipe)200 pipe_create(cpipe)
201 struct pipe *cpipe;
202 {
203 int error;
204
205 /* so pipe_free_kmem() doesn't follow junk pointer */
206 cpipe->pipe_buffer.buffer = NULL;
207 /*
208 * protect so pipeclose() doesn't follow a junk pointer
209 * if pipespace() fails.
210 */
211 bzero(&cpipe->pipe_sel, sizeof cpipe->pipe_sel);
212 cpipe->pipe_state = 0;
213 cpipe->pipe_peer = NULL;
214 cpipe->pipe_busy = 0;
215
216 error = pipespace(cpipe, PIPE_SIZE);
217 if (error != 0)
218 return (error);
219
220 microtime(&cpipe->pipe_ctime);
221 cpipe->pipe_atime = cpipe->pipe_ctime;
222 cpipe->pipe_mtime = cpipe->pipe_ctime;
223 cpipe->pipe_pgid = NO_PID;
224
225 return (0);
226 }
227
228
229 /*
230 * lock a pipe for I/O, blocking other access
231 */
232 static __inline int
pipelock(cpipe)233 pipelock(cpipe)
234 struct pipe *cpipe;
235 {
236 int error;
237 while (cpipe->pipe_state & PIPE_LOCK) {
238 cpipe->pipe_state |= PIPE_LWANT;
239 if ((error = tsleep(cpipe, PRIBIO|PCATCH, "pipelk", 0)))
240 return error;
241 }
242 cpipe->pipe_state |= PIPE_LOCK;
243 return 0;
244 }
245
246 /*
247 * unlock a pipe I/O lock
248 */
249 static __inline void
pipeunlock(cpipe)250 pipeunlock(cpipe)
251 struct pipe *cpipe;
252 {
253 cpipe->pipe_state &= ~PIPE_LOCK;
254 if (cpipe->pipe_state & PIPE_LWANT) {
255 cpipe->pipe_state &= ~PIPE_LWANT;
256 wakeup(cpipe);
257 }
258 }
259
260 static __inline void
pipeselwakeup(cpipe)261 pipeselwakeup(cpipe)
262 struct pipe *cpipe;
263 {
264 if (cpipe->pipe_state & PIPE_SEL) {
265 cpipe->pipe_state &= ~PIPE_SEL;
266 selwakeup(&cpipe->pipe_sel);
267 }
268 if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_pgid != NO_PID)
269 gsignal(cpipe->pipe_pgid, SIGIO);
270 KNOTE(&cpipe->pipe_sel.si_note, 0);
271 }
272
273 /* ARGSUSED */
274 int
pipe_read(fp,poff,uio,cred)275 pipe_read(fp, poff, uio, cred)
276 struct file *fp;
277 off_t *poff;
278 struct uio *uio;
279 struct ucred *cred;
280 {
281 struct pipe *rpipe = (struct pipe *) fp->f_data;
282 int error;
283 int nread = 0;
284 int size;
285
286 error = pipelock(rpipe);
287 if (error)
288 goto unlocked_error;
289
290 ++rpipe->pipe_busy;
291
292 while (uio->uio_resid) {
293 /*
294 * normal pipe buffer receive
295 */
296 if (rpipe->pipe_buffer.cnt > 0) {
297 size = rpipe->pipe_buffer.size - rpipe->pipe_buffer.out;
298 if (size > rpipe->pipe_buffer.cnt)
299 size = rpipe->pipe_buffer.cnt;
300 if (size > uio->uio_resid)
301 size = uio->uio_resid;
302 error = uiomove(&rpipe->pipe_buffer.buffer[rpipe->pipe_buffer.out],
303 size, uio);
304 if (error) {
305 break;
306 }
307 rpipe->pipe_buffer.out += size;
308 if (rpipe->pipe_buffer.out >= rpipe->pipe_buffer.size)
309 rpipe->pipe_buffer.out = 0;
310
311 rpipe->pipe_buffer.cnt -= size;
312 /*
313 * If there is no more to read in the pipe, reset
314 * its pointers to the beginning. This improves
315 * cache hit stats.
316 */
317 if (rpipe->pipe_buffer.cnt == 0) {
318 rpipe->pipe_buffer.in = 0;
319 rpipe->pipe_buffer.out = 0;
320 }
321 nread += size;
322 } else {
323 /*
324 * detect EOF condition
325 * read returns 0 on EOF, no need to set error
326 */
327 if (rpipe->pipe_state & PIPE_EOF)
328 break;
329
330 /*
331 * If the "write-side" has been blocked, wake it up now.
332 */
333 if (rpipe->pipe_state & PIPE_WANTW) {
334 rpipe->pipe_state &= ~PIPE_WANTW;
335 wakeup(rpipe);
336 }
337
338 /*
339 * Break if some data was read.
340 */
341 if (nread > 0)
342 break;
343
344 /*
345 * Unlock the pipe buffer for our remaining processing.
346 * We will either break out with an error or we will
347 * sleep and relock to loop.
348 */
349 pipeunlock(rpipe);
350
351 /*
352 * Handle non-blocking mode operation or
353 * wait for more data.
354 */
355 if (fp->f_flag & FNONBLOCK) {
356 error = EAGAIN;
357 } else {
358 rpipe->pipe_state |= PIPE_WANTR;
359 if ((error = tsleep(rpipe, PRIBIO|PCATCH, "piperd", 0)) == 0)
360 error = pipelock(rpipe);
361 }
362 if (error)
363 goto unlocked_error;
364 }
365 }
366 pipeunlock(rpipe);
367
368 if (error == 0)
369 microtime(&rpipe->pipe_atime);
370 unlocked_error:
371 --rpipe->pipe_busy;
372
373 /*
374 * PIPE_WANT processing only makes sense if pipe_busy is 0.
375 */
376 if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) {
377 rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW);
378 wakeup(rpipe);
379 } else if (rpipe->pipe_buffer.cnt < MINPIPESIZE) {
380 /*
381 * Handle write blocking hysteresis.
382 */
383 if (rpipe->pipe_state & PIPE_WANTW) {
384 rpipe->pipe_state &= ~PIPE_WANTW;
385 wakeup(rpipe);
386 }
387 }
388
389 if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF)
390 pipeselwakeup(rpipe);
391
392 return (error);
393 }
394
395 int
pipe_write(fp,poff,uio,cred)396 pipe_write(fp, poff, uio, cred)
397 struct file *fp;
398 off_t *poff;
399 struct uio *uio;
400 struct ucred *cred;
401 {
402 int error = 0;
403 int orig_resid;
404
405 struct pipe *wpipe, *rpipe;
406
407 rpipe = (struct pipe *) fp->f_data;
408 wpipe = rpipe->pipe_peer;
409
410 /*
411 * detect loss of pipe read side, issue SIGPIPE if lost.
412 */
413 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
414 return (EPIPE);
415 }
416 ++wpipe->pipe_busy;
417
418 /*
419 * If it is advantageous to resize the pipe buffer, do
420 * so.
421 */
422 if ((uio->uio_resid > PIPE_SIZE) &&
423 (nbigpipe < LIMITBIGPIPES) &&
424 (wpipe->pipe_buffer.size <= PIPE_SIZE) &&
425 (wpipe->pipe_buffer.cnt == 0)) {
426
427 if ((error = pipelock(wpipe)) == 0) {
428 if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
429 nbigpipe++;
430 pipeunlock(wpipe);
431 }
432 }
433
434 /*
435 * If an early error occured unbusy and return, waking up any pending
436 * readers.
437 */
438 if (error) {
439 --wpipe->pipe_busy;
440 if ((wpipe->pipe_busy == 0) &&
441 (wpipe->pipe_state & PIPE_WANT)) {
442 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
443 wakeup(wpipe);
444 }
445 return (error);
446 }
447
448 orig_resid = uio->uio_resid;
449
450 while (uio->uio_resid) {
451 int space;
452
453 retrywrite:
454 if (wpipe->pipe_state & PIPE_EOF) {
455 error = EPIPE;
456 break;
457 }
458
459 space = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
460
461 /* Writes of size <= PIPE_BUF must be atomic. */
462 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
463 space = 0;
464
465 if (space > 0) {
466 if ((error = pipelock(wpipe)) == 0) {
467 int size; /* Transfer size */
468 int segsize; /* first segment to transfer */
469
470 /*
471 * If a process blocked in uiomove, our
472 * value for space might be bad.
473 *
474 * XXX will we be ok if the reader has gone
475 * away here?
476 */
477 if (space > wpipe->pipe_buffer.size -
478 wpipe->pipe_buffer.cnt) {
479 pipeunlock(wpipe);
480 goto retrywrite;
481 }
482
483 /*
484 * Transfer size is minimum of uio transfer
485 * and free space in pipe buffer.
486 */
487 if (space > uio->uio_resid)
488 size = uio->uio_resid;
489 else
490 size = space;
491 /*
492 * First segment to transfer is minimum of
493 * transfer size and contiguous space in
494 * pipe buffer. If first segment to transfer
495 * is less than the transfer size, we've got
496 * a wraparound in the buffer.
497 */
498 segsize = wpipe->pipe_buffer.size -
499 wpipe->pipe_buffer.in;
500 if (segsize > size)
501 segsize = size;
502
503 /* Transfer first segment */
504
505 error = uiomove(&wpipe->pipe_buffer.buffer[wpipe->pipe_buffer.in],
506 segsize, uio);
507
508 if (error == 0 && segsize < size) {
509 /*
510 * Transfer remaining part now, to
511 * support atomic writes. Wraparound
512 * happened.
513 */
514 #ifdef DIAGNOSTIC
515 if (wpipe->pipe_buffer.in + segsize !=
516 wpipe->pipe_buffer.size)
517 panic("Expected pipe buffer wraparound disappeared");
518 #endif
519
520 error = uiomove(&wpipe->pipe_buffer.buffer[0],
521 size - segsize, uio);
522 }
523 if (error == 0) {
524 wpipe->pipe_buffer.in += size;
525 if (wpipe->pipe_buffer.in >=
526 wpipe->pipe_buffer.size) {
527 #ifdef DIAGNOSTIC
528 if (wpipe->pipe_buffer.in != size - segsize + wpipe->pipe_buffer.size)
529 panic("Expected wraparound bad");
530 #endif
531 wpipe->pipe_buffer.in = size - segsize;
532 }
533
534 wpipe->pipe_buffer.cnt += size;
535 #ifdef DIAGNOSTIC
536 if (wpipe->pipe_buffer.cnt > wpipe->pipe_buffer.size)
537 panic("Pipe buffer overflow");
538 #endif
539 }
540 pipeunlock(wpipe);
541 }
542 if (error)
543 break;
544 } else {
545 /*
546 * If the "read-side" has been blocked, wake it up now.
547 */
548 if (wpipe->pipe_state & PIPE_WANTR) {
549 wpipe->pipe_state &= ~PIPE_WANTR;
550 wakeup(wpipe);
551 }
552
553 /*
554 * don't block on non-blocking I/O
555 */
556 if (fp->f_flag & FNONBLOCK) {
557 error = EAGAIN;
558 break;
559 }
560
561 /*
562 * We have no more space and have something to offer,
563 * wake up select/poll.
564 */
565 pipeselwakeup(wpipe);
566
567 wpipe->pipe_state |= PIPE_WANTW;
568 error = tsleep(wpipe, (PRIBIO + 1)|PCATCH,
569 "pipewr", 0);
570 if (error)
571 break;
572 /*
573 * If read side wants to go away, we just issue a
574 * signal to ourselves.
575 */
576 if (wpipe->pipe_state & PIPE_EOF) {
577 error = EPIPE;
578 break;
579 }
580 }
581 }
582
583 --wpipe->pipe_busy;
584
585 if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) {
586 wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR);
587 wakeup(wpipe);
588 } else if (wpipe->pipe_buffer.cnt > 0) {
589 /*
590 * If we have put any characters in the buffer, we wake up
591 * the reader.
592 */
593 if (wpipe->pipe_state & PIPE_WANTR) {
594 wpipe->pipe_state &= ~PIPE_WANTR;
595 wakeup(wpipe);
596 }
597 }
598
599 /*
600 * Don't return EPIPE if I/O was successful
601 */
602 if ((wpipe->pipe_buffer.cnt == 0) &&
603 (uio->uio_resid == 0) &&
604 (error == EPIPE)) {
605 error = 0;
606 }
607
608 if (error == 0)
609 microtime(&wpipe->pipe_mtime);
610 /*
611 * We have something to offer, wake up select/poll.
612 */
613 if (wpipe->pipe_buffer.cnt)
614 pipeselwakeup(wpipe);
615
616 return (error);
617 }
618
619 /*
620 * we implement a very minimal set of ioctls for compatibility with sockets.
621 */
622 int
pipe_ioctl(fp,cmd,data,p)623 pipe_ioctl(fp, cmd, data, p)
624 struct file *fp;
625 u_long cmd;
626 caddr_t data;
627 struct proc *p;
628 {
629 struct pipe *mpipe = (struct pipe *)fp->f_data;
630
631 switch (cmd) {
632
633 case FIONBIO:
634 return (0);
635
636 case FIOASYNC:
637 if (*(int *)data) {
638 mpipe->pipe_state |= PIPE_ASYNC;
639 } else {
640 mpipe->pipe_state &= ~PIPE_ASYNC;
641 }
642 return (0);
643
644 case FIONREAD:
645 *(int *)data = mpipe->pipe_buffer.cnt;
646 return (0);
647
648 case SIOCSPGRP:
649 mpipe->pipe_pgid = *(int *)data;
650 return (0);
651
652 case SIOCGPGRP:
653 *(int *)data = mpipe->pipe_pgid;
654 return (0);
655
656 }
657 return (ENOTTY);
658 }
659
660 int
pipe_poll(fp,events,p)661 pipe_poll(fp, events, p)
662 struct file *fp;
663 int events;
664 struct proc *p;
665 {
666 struct pipe *rpipe = (struct pipe *)fp->f_data;
667 struct pipe *wpipe;
668 int revents = 0;
669
670 wpipe = rpipe->pipe_peer;
671 if (events & (POLLIN | POLLRDNORM)) {
672 if ((rpipe->pipe_buffer.cnt > 0) ||
673 (rpipe->pipe_state & PIPE_EOF))
674 revents |= events & (POLLIN | POLLRDNORM);
675 }
676
677 /* NOTE: POLLHUP and POLLOUT/POLLWRNORM are mutually exclusive */
678 if ((rpipe->pipe_state & PIPE_EOF) ||
679 (wpipe == NULL) ||
680 (wpipe->pipe_state & PIPE_EOF))
681 revents |= POLLHUP;
682 else if (events & (POLLOUT | POLLWRNORM)) {
683 if ((wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) >= PIPE_BUF)
684 revents |= events & (POLLOUT | POLLWRNORM);
685 }
686
687 if (revents == 0) {
688 if (events & (POLLIN | POLLRDNORM)) {
689 selrecord(p, &rpipe->pipe_sel);
690 rpipe->pipe_state |= PIPE_SEL;
691 }
692 if (events & (POLLOUT | POLLWRNORM)) {
693 selrecord(p, &wpipe->pipe_sel);
694 wpipe->pipe_state |= PIPE_SEL;
695 }
696 }
697 return (revents);
698 }
699
700 int
pipe_stat(fp,ub,p)701 pipe_stat(fp, ub, p)
702 struct file *fp;
703 struct stat *ub;
704 struct proc *p;
705 {
706 struct pipe *pipe = (struct pipe *)fp->f_data;
707
708 bzero(ub, sizeof(*ub));
709 ub->st_mode = S_IFIFO;
710 ub->st_blksize = pipe->pipe_buffer.size;
711 ub->st_size = pipe->pipe_buffer.cnt;
712 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
713 TIMEVAL_TO_TIMESPEC(&pipe->pipe_atime, &ub->st_atimespec);
714 TIMEVAL_TO_TIMESPEC(&pipe->pipe_mtime, &ub->st_mtimespec);
715 TIMEVAL_TO_TIMESPEC(&pipe->pipe_ctime, &ub->st_ctimespec);
716 ub->st_uid = fp->f_cred->cr_uid;
717 ub->st_gid = fp->f_cred->cr_gid;
718 /*
719 * Left as 0: st_dev, st_ino, st_nlink, st_rdev, st_flags, st_gen.
720 * XXX (st_dev, st_ino) should be unique.
721 */
722 return (0);
723 }
724
725 /* ARGSUSED */
726 int
pipe_close(fp,p)727 pipe_close(fp, p)
728 struct file *fp;
729 struct proc *p;
730 {
731 struct pipe *cpipe = (struct pipe *)fp->f_data;
732
733 fp->f_ops = NULL;
734 fp->f_data = NULL;
735 pipeclose(cpipe);
736 return (0);
737 }
738
739 void
pipe_free_kmem(cpipe)740 pipe_free_kmem(cpipe)
741 struct pipe *cpipe;
742 {
743 if (cpipe->pipe_buffer.buffer != NULL) {
744 if (cpipe->pipe_buffer.size > PIPE_SIZE)
745 --nbigpipe;
746 amountpipekva -= cpipe->pipe_buffer.size;
747 uvm_km_free(kernel_map, (vaddr_t)cpipe->pipe_buffer.buffer,
748 cpipe->pipe_buffer.size);
749 cpipe->pipe_buffer.buffer = NULL;
750 }
751 }
752
753 /*
754 * shutdown the pipe
755 */
756 void
pipeclose(cpipe)757 pipeclose(cpipe)
758 struct pipe *cpipe;
759 {
760 struct pipe *ppipe;
761 if (cpipe) {
762
763 pipeselwakeup(cpipe);
764
765 /*
766 * If the other side is blocked, wake it up saying that
767 * we want to close it down.
768 */
769 while (cpipe->pipe_busy) {
770 wakeup(cpipe);
771 cpipe->pipe_state |= PIPE_WANT | PIPE_EOF;
772 tsleep(cpipe, PRIBIO, "pipecl", 0);
773 }
774
775 /*
776 * Disconnect from peer
777 */
778 if ((ppipe = cpipe->pipe_peer) != NULL) {
779 pipeselwakeup(ppipe);
780
781 ppipe->pipe_state |= PIPE_EOF;
782 wakeup(ppipe);
783 KNOTE(&ppipe->pipe_sel.si_note, 0);
784 ppipe->pipe_peer = NULL;
785 }
786
787 /*
788 * free resources
789 */
790 pipe_free_kmem(cpipe);
791 pool_put(&pipe_pool, cpipe);
792 }
793 }
794
795 int
pipe_kqfilter(struct file * fp,struct knote * kn)796 pipe_kqfilter(struct file *fp, struct knote *kn)
797 {
798 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
799 struct pipe *wpipe = rpipe->pipe_peer;
800
801 switch (kn->kn_filter) {
802 case EVFILT_READ:
803 kn->kn_fop = &pipe_rfiltops;
804 SLIST_INSERT_HEAD(&rpipe->pipe_sel.si_note, kn, kn_selnext);
805 break;
806 case EVFILT_WRITE:
807 if (wpipe == NULL)
808 /* other end of pipe has been closed */
809 return (1);
810 kn->kn_fop = &pipe_wfiltops;
811 SLIST_INSERT_HEAD(&wpipe->pipe_sel.si_note, kn, kn_selnext);
812 break;
813 default:
814 return (1);
815 }
816
817 return (0);
818 }
819
820 void
filt_pipedetach(struct knote * kn)821 filt_pipedetach(struct knote *kn)
822 {
823 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
824 struct pipe *wpipe = rpipe->pipe_peer;
825
826 switch (kn->kn_filter) {
827 case EVFILT_READ:
828 SLIST_REMOVE(&rpipe->pipe_sel.si_note, kn, knote, kn_selnext);
829 break;
830 case EVFILT_WRITE:
831 if (wpipe == NULL)
832 return;
833 SLIST_REMOVE(&wpipe->pipe_sel.si_note, kn, knote, kn_selnext);
834 break;
835 }
836 }
837
838 /*ARGSUSED*/
839 int
filt_piperead(struct knote * kn,long hint)840 filt_piperead(struct knote *kn, long hint)
841 {
842 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
843 struct pipe *wpipe = rpipe->pipe_peer;
844
845 kn->kn_data = rpipe->pipe_buffer.cnt;
846
847 if ((rpipe->pipe_state & PIPE_EOF) ||
848 (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
849 kn->kn_flags |= EV_EOF;
850 return (1);
851 }
852 return (kn->kn_data > 0);
853 }
854
855 /*ARGSUSED*/
856 int
filt_pipewrite(struct knote * kn,long hint)857 filt_pipewrite(struct knote *kn, long hint)
858 {
859 struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
860 struct pipe *wpipe = rpipe->pipe_peer;
861
862 if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
863 kn->kn_data = 0;
864 kn->kn_flags |= EV_EOF;
865 return (1);
866 }
867 kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
868
869 return (kn->kn_data >= PIPE_BUF);
870 }
871
872 void
pipe_init()873 pipe_init()
874 {
875 pool_init(&pipe_pool, sizeof(struct pipe), 0, 0, 0, "pipepl",
876 &pool_allocator_nointr);
877 }
878
879