xref: /dragonfly/sys/kern/kern_mpipe.c (revision dfdd013d83912ace3b83e4c45b180a6c1d41212c)
1 /*
2  * Copyright (c) 2003,2004,2020 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 
35 #include <sys/param.h>
36 #include <sys/systm.h>
37 #include <sys/kernel.h>
38 #include <sys/slaballoc.h>
39 #include <sys/malloc.h>
40 #include <sys/mbuf.h>
41 #include <sys/vmmeter.h>
42 #include <sys/lock.h>
43 #include <sys/thread.h>
44 #include <sys/globaldata.h>
45 #include <sys/mpipe.h>
46 #include <sys/kthread.h>
47 
48 struct mpipe_callback {
49           STAILQ_ENTRY(mpipe_callback) entry;
50           void (*func)(void *arg1, void *arg2);
51           void *arg1;
52           void *arg2;
53 };
54 
55 static MALLOC_DEFINE(M_MPIPEARY, "MPipe Array", "Auxiliary MPIPE structure");
56 
57 static void mpipe_thread(void *arg);
58 
59 /*
60  * Initialize a malloc pipeline for the specified malloc type and allocation
61  * size.  Create an array to cache up to nom_count buffers and preallocate
62  * them.
63  */
64 void
mpipe_init(malloc_pipe_t mpipe,malloc_type_t type,int bytes,int nnom,int nmax,int mpflags,void (* construct)(void *,void *),void (* deconstruct)(void *,void *),void * priv)65 mpipe_init(malloc_pipe_t mpipe, malloc_type_t type, int bytes,
66           int nnom, int nmax,
67           int mpflags,
68           void (*construct)(void *, void *),
69           void (*deconstruct)(void *, void *),
70           void *priv)
71 {
72     int n;
73 
74     if (nnom < 1)
75           nnom = 1;
76     if (nmax < 0)
77           nmax = 0x7FFF0000;  /* some very large number */
78     if (nmax < nnom)
79           nmax = nnom;
80     bzero(mpipe, sizeof(struct malloc_pipe));
81     mpipe->type = type;
82     mpipe->bytes = bytes;
83     mpipe->mpflags = mpflags;
84     mpipe->construct = construct;
85     mpipe->deconstruct = deconstruct;
86     mpipe->priv = priv;
87     if ((mpflags & MPF_NOZERO) == 0)
88           mpipe->mflags |= M_ZERO;
89     if (mpflags & MPF_INT)
90           mpipe->mflags |= M_USE_RESERVE | M_USE_INTERRUPT_RESERVE;
91     mpipe->ary_count = nnom;
92     mpipe->max_count = nmax;
93     mpipe->array = kmalloc(nnom * sizeof(mpipe->array[0]), M_MPIPEARY,
94                                   M_WAITOK | M_ZERO);
95 
96     while (mpipe->free_count < nnom) {
97           n = mpipe->free_count;
98           mpipe->array[n] = kmalloc(bytes, mpipe->type, M_WAITOK | mpipe->mflags);
99           if (construct)
100               construct(mpipe->array[n], priv);
101           ++mpipe->free_count;
102           ++mpipe->total_count;
103     }
104     STAILQ_INIT(&mpipe->queue);
105 
106     lwkt_token_init(&mpipe->token, "mpipe token");
107 
108     /*
109      * Create a support thread for the mpipe queue
110      */
111     if (mpflags & MPF_CALLBACK) {
112               kthread_create(mpipe_thread, mpipe, &mpipe->thread,
113                                  "mpipe_%s", type->ks_shortdesc);
114     }
115 }
116 
117 /*
118  * Destroy a previously initialized mpipe.  This routine can also safely be
119  * called on an uninitialized mpipe structure if it was zero'd or mpipe_done()
120  * was previously called on it.
121  */
122 void
mpipe_done(malloc_pipe_t mpipe)123 mpipe_done(malloc_pipe_t mpipe)
124 {
125     void *buf;
126     int n;
127 
128     KKASSERT(mpipe->free_count == mpipe->total_count);      /* no outstanding mem */
129 
130     /*
131      * Clean up the kthread
132      */
133     lwkt_gettoken(&mpipe->token);
134     mpipe->mpflags |= MPF_EXITING;
135     while (mpipe->thread) {
136           wakeup(&mpipe->queue);
137           tsleep(mpipe, 0, "mpipex", 1);
138     }
139 
140     /*
141      * Clean up the mpipe buffers
142      */
143     for (n = mpipe->free_count - 1; n >= 0; --n) {
144           buf = mpipe->array[n];
145           mpipe->array[n] = NULL;
146           KKASSERT(buf != NULL);
147           if (mpipe->deconstruct)
148               mpipe->deconstruct(buf, mpipe->priv);
149           kfree(buf, mpipe->type);
150     }
151     mpipe->free_count = 0;
152     mpipe->total_count = 0;
153     if (mpipe->array) {
154           kfree(mpipe->array, M_MPIPEARY);
155           mpipe->array = NULL;
156     }
157     lwkt_reltoken(&mpipe->token);
158     lwkt_token_uninit(&mpipe->token);
159 }
160 
161 /*
162  * mpipe support thread for request failures when mpipe_alloc_callback()
163  * is called.
164  *
165  * Only set MPF_QUEUEWAIT if entries are pending in the queue.  If no entries
166  * are pending and a new entry is added, other code will set MPF_QUEUEWAIT
167  * for us.
168  */
169 static void
mpipe_thread(void * arg)170 mpipe_thread(void *arg)
171 {
172     malloc_pipe_t mpipe = arg;
173     struct mpipe_callback *mcb;
174 
175     lwkt_gettoken(&mpipe->token);
176     while ((mpipe->mpflags & MPF_EXITING) == 0) {
177           while (mpipe->free_count &&
178                  (mcb = STAILQ_FIRST(&mpipe->queue)) != NULL) {
179                     STAILQ_REMOVE(&mpipe->queue, mcb, mpipe_callback, entry);
180                     mcb->func(mcb->arg1, mcb->arg2);
181                     kfree(mcb, M_MPIPEARY);
182           }
183           if (STAILQ_FIRST(&mpipe->queue))
184                     mpipe->mpflags |= MPF_QUEUEWAIT;
185           tsleep(&mpipe->queue, 0, "wait", 0);
186     }
187     mpipe->thread = NULL;
188     lwkt_reltoken(&mpipe->token);
189     wakeup(mpipe);
190 }
191 
192 
193 /*
194  * Allocate an entry (inline suppot routine).  The allocation is guarenteed
195  * to return non-NULL up to the nominal count after which it may return NULL.
196  * Note that the implementation is defined to be allowed to block for short
197  * periods of time.
198  *
199  * Use mpipe_alloc_callback() for non-blocking operation with a callback
200  * Use mpipe_alloc_nowait() for non-blocking operation without a callback
201  * Use mpipe_alloc_waitok() for blocking operation & guarenteed non-NULL
202  */
203 static __inline
204 void *
_mpipe_alloc_locked(malloc_pipe_t mpipe,int mfailed)205 _mpipe_alloc_locked(malloc_pipe_t mpipe, int mfailed)
206 {
207     void *buf;
208     int n;
209 
210     if ((n = mpipe->free_count) != 0) {
211           /*
212            * Use a free entry if it exists.
213            */
214           --n;
215           buf = mpipe->array[n];
216           mpipe->array[n] = NULL;       /* sanity check, not absolutely needed */
217           mpipe->free_count = n;
218     } else if (mpipe->total_count >= mpipe->max_count || mfailed) {
219           /*
220            * Return NULL if we have hit our limit
221            */
222           buf = NULL;
223     } else {
224           /*
225            * Otherwise try to malloc() non-blocking.
226            */
227           buf = kmalloc(mpipe->bytes, mpipe->type, M_NOWAIT | mpipe->mflags);
228           if (buf) {
229               ++mpipe->total_count;
230               if (mpipe->construct)
231                   mpipe->construct(buf, mpipe->priv);
232           }
233     }
234     return(buf);
235 }
236 
237 /*
238  * Nominal non-blocking mpipe allocation
239  */
240 void *
mpipe_alloc_nowait(malloc_pipe_t mpipe)241 mpipe_alloc_nowait(malloc_pipe_t mpipe)
242 {
243     void *buf;
244 
245     lwkt_gettoken(&mpipe->token);
246     buf = _mpipe_alloc_locked(mpipe, 0);
247     lwkt_reltoken(&mpipe->token);
248 
249     return(buf);
250 }
251 
252 /*
253  * non-blocking mpipe allocation with callback for retry.
254  *
255  * If NULL is returned func(arg) is queued and will be called back when
256  * space is likely (but not necessarily) available.
257  *
258  * If non-NULL is returned func(arg) is ignored.
259  */
260 void *
mpipe_alloc_callback(malloc_pipe_t mpipe,void (* func)(void * arg1,void * arg2),void * arg1,void * arg2)261 mpipe_alloc_callback(malloc_pipe_t mpipe, void (*func)(void *arg1, void *arg2),
262                          void *arg1, void *arg2)
263 {
264     struct mpipe_callback *mcb;
265     void *buf;
266 
267     lwkt_gettoken(&mpipe->token);
268     buf = _mpipe_alloc_locked(mpipe, 0);
269     if (buf == NULL) {
270           mcb = kmalloc(sizeof(*mcb), M_MPIPEARY, M_INTWAIT);
271           buf = _mpipe_alloc_locked(mpipe, 0);
272           if (buf == NULL) {
273               mcb->func = func;
274               mcb->arg1 = arg1;
275               mcb->arg2 = arg2;
276               STAILQ_INSERT_TAIL(&mpipe->queue, mcb, entry);
277               mpipe->mpflags |= MPF_QUEUEWAIT;    /* for mpipe_thread() */
278           } else {
279               kfree(mcb, M_MPIPEARY);
280           }
281     }
282     lwkt_reltoken(&mpipe->token);
283 
284     return(buf);
285 }
286 
287 /*
288  * This function can be called to nominally wait until resources are
289  * available and mpipe_alloc_nowait() is likely to return non-NULL.
290  *
291  * NOTE: mpipe_alloc_nowait() can still return NULL.
292  */
293 void
mpipe_wait(malloc_pipe_t mpipe)294 mpipe_wait(malloc_pipe_t mpipe)
295 {
296     if (mpipe->free_count == 0) {
297           lwkt_gettoken(&mpipe->token);
298           while ((mpipe->mpflags & MPF_EXITING) == 0) {
299               if (mpipe->free_count)
300                         break;
301               mpipe->mpflags |= MPF_QUEUEWAIT;
302               tsleep(&mpipe->queue, 0, "wait", 0);
303           }
304           lwkt_reltoken(&mpipe->token);
305     }
306 }
307 
308 /*
309  * Allocate an entry, block until the allocation succeeds.  This may cause
310  * us to block waiting for a prior allocation to be freed.
311  */
312 void *
mpipe_alloc_waitok(malloc_pipe_t mpipe)313 mpipe_alloc_waitok(malloc_pipe_t mpipe)
314 {
315     void *buf;
316     int mfailed;
317 
318     lwkt_gettoken(&mpipe->token);
319     mfailed = 0;
320     while ((buf = _mpipe_alloc_locked(mpipe, mfailed)) == NULL) {
321           /*
322            * Block if we have hit our limit
323            */
324           mpipe->pending = 1;
325           tsleep(mpipe, 0, "mpipe1", 0);
326           mfailed = 1;
327     }
328     lwkt_reltoken(&mpipe->token);
329 
330     return(buf);
331 }
332 
333 /*
334  * Free an entry, unblock any waiters.  Allow NULL.
335  */
336 void
mpipe_free(malloc_pipe_t mpipe,void * buf)337 mpipe_free(malloc_pipe_t mpipe, void *buf)
338 {
339     int n;
340 
341     if (buf == NULL)
342           return;
343 
344     lwkt_gettoken(&mpipe->token);
345     if ((n = mpipe->free_count) < mpipe->ary_count) {
346           /*
347            * Free slot available in free array (LIFO)
348            */
349           mpipe->array[n] = buf;
350           ++mpipe->free_count;
351           if ((mpipe->mpflags & (MPF_CACHEDATA|MPF_NOZERO)) == 0)
352               bzero(buf, mpipe->bytes);
353           if (mpipe->mpflags & MPF_QUEUEWAIT) {
354                     mpipe->mpflags &= ~MPF_QUEUEWAIT;
355                     lwkt_reltoken(&mpipe->token);
356                     wakeup(&mpipe->queue);
357           } else {
358                     lwkt_reltoken(&mpipe->token);
359           }
360           /*
361            * Wakeup anyone blocked in mpipe_alloc_*().
362            */
363           if (mpipe->pending) {
364               mpipe->pending = 0;
365               wakeup(mpipe);
366           }
367     } else {
368           /*
369            * All the free slots are full, free the buffer directly.
370            */
371           --mpipe->total_count;
372           KKASSERT(mpipe->total_count >= mpipe->free_count);
373           if (mpipe->deconstruct)
374               mpipe->deconstruct(buf, mpipe->priv);
375           lwkt_reltoken(&mpipe->token);
376           kfree(buf, mpipe->type);
377     }
378 }
379 
380