xref: /NextBSD/sys/sys/buf_ring.h (revision d3c636c9b89dbfe48820ea0171c6f6ab6e41d24a)
1 /*-
2  * Copyright (c) 2007-2015 Matt Macy <mmacy@nextbsd.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD$
27  *
28  */
29 
30 #ifndef	_SYS_BUF_RING_H_
31 #define	_SYS_BUF_RING_H_
32 
33 #include <machine/cpu.h>
34 
35 #if defined(INVARIANTS) && !defined(DEBUG_BUFRING)
36 #define DEBUG_BUFRING 1
37 #endif
38 
39 #ifdef DEBUG_BUFRING
40 #include <sys/lock.h>
41 #include <sys/mutex.h>
42 #endif
43 
44 /* cache line align buf ring entries */
45 #define BR_FLAGS_ALIGNED 0x1
46 
47 struct br_entry_ {
48 	volatile void *bre_ptr;
49 };
50 
51 struct buf_ring {
52 	volatile uint32_t	br_prod_head;
53 	volatile uint32_t	br_prod_tail;
54 	int              	br_prod_size;
55 	int              	br_prod_mask;
56 	uint64_t		br_drops;
57 	/* cache line aligned to avoid cache line invalidate traffic
58 	 * between consumer and producer (false sharing)
59 	 */
60 	volatile uint32_t	br_cons_head __aligned(CACHE_LINE_SIZE);
61 	volatile uint32_t	br_cons_tail;
62 	int		 	br_cons_size;
63 	int              	br_cons_mask;
64 #ifdef DEBUG_BUFRING
65 	struct mtx		*br_lock;
66 #endif
67 	/* cache line aligned to avoid false sharing with other data structures
68 	 */
69 	int			br_flags  __aligned(CACHE_LINE_SIZE);
70 	struct br_entry_	br_ring[0] __aligned(CACHE_LINE_SIZE);
71 };
72 
73 /*
74  * ring entry accessors to allow us to make ring entry
75  * alignment determined at runtime
76  */
77 static __inline void *
br_entry_get(struct buf_ring * br,int i)78 br_entry_get(struct buf_ring *br, int i)
79 {
80 	volatile void *ent;
81 
82 	if (br->br_flags & BR_FLAGS_ALIGNED)
83 		ent = br->br_ring[i*(CACHE_LINE_SIZE/sizeof(caddr_t))].bre_ptr;
84 	else
85 		ent = br->br_ring[i].bre_ptr;
86 	return ((void *)(uintptr_t)ent);
87 }
88 
89 static __inline void
br_entry_set(struct buf_ring * br,int i,void * buf)90 br_entry_set(struct buf_ring *br, int i, void *buf)
91 {
92 
93 	if (br->br_flags & BR_FLAGS_ALIGNED)
94 		br->br_ring[i*(CACHE_LINE_SIZE/sizeof(caddr_t))].bre_ptr = buf;
95 	else
96 		br->br_ring[i].bre_ptr = buf;
97 }
98 
99 /*
100  * Many architectures other than x86 permit speculative re-ordering
101  * of loads. Unfortunately, atomic_load_acq_32() is comparatively
102  * expensive so we'd rather elide it if possible.
103  */
104 #if defined(__i386__) || defined(__amd64__)
105 #define ORDERED_LOAD_32(x) (*x)
106 #else
107 #define ORDERED_LOAD_32(x) atomic_load_acq_32((x))
108 #endif
109 
110 /*
111  * Multi-producer safe lock-free ring buffer enqueue
112  *
113  * Most architectures do not support the atomic update of multiple
114  * discontiguous locations. So it is not possible to atomically update
115  * the producer index and ring buffer entry. To side-step this limitation
116  * we split update in to 3 steps:
117  *      1) atomically acquiring an index
118  *      2) updating the corresponding ring entry
119  *      3) making the update available to the consumer
120  * In order to split the index update in to an acquire and release
121  * phase there are _two_ producer indexes. 'prod_head' is used for
122  * step 1) and is thus only used by the enqueue itself. 'prod_tail'
123  * is used for step 3) to signal to the consumer that the update is
124  * complete. To guarantee memory ordering the update of 'prod_tail' is
125  * done with a atomic_store_rel_32(...) and the corresponding
126  * initial read of 'prod_tail' by the dequeue functions is done with
127  * an atomic_load_acq_32(...).
128  *
129  * Regarding memory ordering - there are five variables in question:
130  * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}]
131  * It's easiest examine correctness by considering the consequence of
132  * reading a stale value or having an update become visible prior to
133  * preceding writes.
134  *
135  * - prod_head: this is only read by the enqueue routine, if the latter were to
136  *   initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32)
137  *   would fail. However, the implied memory barrier in cmpxchg would cause the
138  *   subsequent read of prod_head to read the up-to-date value permitting the
139  *   cmpxchg to succeed the second time.
140  *
141  * - prod_tail: This value is used by dequeue to determine the effective
142  *   producer index. On architectures with weaker memory ordering than x86 it
143  *   needs special handling. In enqueue it needs to be updated with
144  *   atomic_store_rel_32() (i.e. a write memory barrier before update) to
145  *   guarantee that the new ring value is committed to memory before it is
146  *   made available by prod_tail. In dequeue to guarantee that it is read before
147  *   br_ring[cons_head] it needs to be read with atomic_load_acq_32().
148  *
149  * - cons_head: this value is used only by dequeue, it is either updated
150  *   atomically (dequeue_mc) or protected by a mutex (dequeue_sc).
151  *
152  * - cons_tail: This is used to communicate the latest consumer index between
153  *   dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue
154  *   to fail erroneously. To avoid a load being re-ordered after a store (and
155  *   thus permitting enqueue to store a new value before the old one has been
156  *   consumed) it is updated with an atomic_store_rel_32() in deqeueue.
157  *
158  * - ring[idx] : Updates to this value need to reach memory before the subsequent
159  *   update to prod_tail does. Reads need to happen before subsequent updates to
160  *   cons_tail.
161  *
162  * Some implementation notes:
163  * - Much like a simpler single-producer single consumer ring buffer,
164  *   the producer can not produce faster than the consumer. Hence the
165  *   check of 'prod_head' + 1 against 'cons_tail'.
166  *
167  * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to
168  *   calculate the next index is slightly cheaper than a modulo but
169  *   requires the ring to be power-of-2 sized.
170  *
171  * - The critical_enter() / critical_exit() are not required for
172  *   correctness. They prevent updates from stalling by having a producer be
173  *   preempted after updating 'prod_head' but before updating 'prod_tail'.
174  *
175  * - The "while (br->br_prod_tail != prod_head)"
176  *   check assures in order completion (probably not strictly necessary,
177  *   but makes it easier to reason about) and allows us to update
178  *   'prod_tail' without a cmpxchg / LOCK prefix.
179  *
180  */
181 static __inline int
buf_ring_enqueue(struct buf_ring * br,void * buf)182 buf_ring_enqueue(struct buf_ring *br, void *buf)
183 {
184 	uint32_t prod_head, prod_next, cons_tail;
185 #ifdef DEBUG_BUFRING
186 	int i;
187 	for (i = br->br_cons_head; i != br->br_prod_head;
188 	     i = ((i + 1) & br->br_cons_mask))
189 		if(br->br_ring[i].bre_ptr == buf)
190 			panic("buf=%p already enqueue at %d prod=%d cons=%d",
191 			    buf, i, br->br_prod_tail, br->br_cons_tail);
192 #endif
193 	critical_enter();
194 	do {
195 
196 		prod_head = br->br_prod_head;
197 		prod_next = (prod_head + 1) & br->br_prod_mask;
198 		cons_tail = br->br_cons_tail;
199 
200 		if (prod_next == cons_tail) {
201 			/* ensure that we only return ENOBUFS
202 			 * if the latest value matches what we read
203 			 */
204 			if (prod_head != atomic_load_acq_32(&br->br_prod_head) ||
205 			    cons_tail != atomic_load_acq_32(&br->br_cons_tail))
206 				continue;
207 
208 			br->br_drops++;
209 			critical_exit();
210 			return (ENOBUFS);
211 		}
212 	} while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
213 #ifdef DEBUG_BUFRING
214 	if (br->br_ring[prod_head].bre_ptr != NULL)
215 		panic("dangling value in enqueue");
216 #endif
217 	br->br_ring[prod_head].bre_ptr = buf;
218 
219 	/*
220 	 * If there are other enqueues in progress
221 	 * that preceded us, we need to wait for them
222 	 * to complete
223 	 * re-ordering of reads would not effect correctness
224 	 */
225 	while (br->br_prod_tail != prod_head)
226 		cpu_spinwait();
227 	/* ensure  that the ring update reaches memory before the new
228 	 * value of prod_tail
229 	 */
230 	atomic_store_rel_32(&br->br_prod_tail, prod_next);
231 	critical_exit();
232 	return (0);
233 }
234 
235 /*
236  * multi-consumer safe dequeue
237  *
238  */
239 static __inline void *
buf_ring_dequeue_mc(struct buf_ring * br)240 buf_ring_dequeue_mc(struct buf_ring *br)
241 {
242 	uint32_t cons_head, cons_next;
243 	volatile void *buf;
244 
245 	critical_enter();
246 	do {
247 		/*
248 		 * prod_tail must be read before br_ring[cons_head] is
249 		 * and the atomic_cmpset_acq_32 on br_cons_head should
250 		 * enforce that
251 		 */
252 		cons_head = br->br_cons_head;
253 		if (cons_head == br->br_prod_tail) {
254 			critical_exit();
255 			return (NULL);
256 		}
257 		cons_next = (cons_head + 1) & br->br_cons_mask;
258 	} while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
259 
260 	/* ensure that the read completes before either of the
261 	 * subsequent stores
262 	 */
263 	buf = br->br_ring[cons_head].bre_ptr;
264 	/* guarantee that the load completes before we update cons_tail */
265 	br->br_ring[cons_head].bre_ptr = NULL;
266 
267 	/*
268 	 * If there are other dequeues in progress
269 	 * that preceded us, we need to wait for them
270 	 * to complete - no memory barrier needed as
271 	 * re-ordering shouldn't effect correctness or
272 	 * progress
273 	 */
274 	while (br->br_cons_tail != cons_head)
275 		cpu_spinwait();
276 	/*
277 	 * assure that the ring entry is read before
278 	 * marking the entry as free by updating cons_tail
279 	 */
280 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
281 	critical_exit();
282 
283 	return ((void *)(uintptr_t)buf);
284 }
285 
286 /*
287  * single-consumer dequeue
288  * use where dequeue is protected by a lock
289  * e.g. a network driver's tx queue lock
290  */
291 static __inline void *
buf_ring_dequeue_sc(struct buf_ring * br)292 buf_ring_dequeue_sc(struct buf_ring *br)
293 {
294 	uint32_t cons_head, cons_next;
295 #ifdef PREFETCH_DEFINED
296 	uint32_t cons_next_next;
297 	uint32_t prod_tail;
298 #endif
299 	volatile void *buf;
300 
301 	/*
302 	 * prod_tail tells whether or not br_ring[cons_head] is valid
303 	 * thus we must guarantee that it is read first
304 	 */
305 	cons_head = br->br_cons_head;
306 	if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail))
307 		return (NULL);
308 
309 	cons_next = (cons_head + 1) & br->br_cons_mask;
310 #ifdef PREFETCH_DEFINED
311 	/*
312 	 * If prod_tail is stale we will prefetch the wrong value - but this is safe
313 	 * as cache coherence (should) ensure that the when the value is loaded for
314 	 * actual use it is fetched from main memory
315 	 */
316 	prod_tail = br->br_prod_tail;
317 	cons_next_next = (cons_head + 2) & br->br_cons_mask;
318 	if (cons_next != prod_tail) {
319 		prefetch(br->br_ring[cons_next].bre_ptr);
320 		if (cons_next_next != prod_tail)
321 			prefetch(br->br_ring[cons_next_next].bre_ptr);
322 	}
323 #endif
324 	br->br_cons_head = cons_next;
325 	buf = br->br_ring[cons_head].bre_ptr;
326 	/* guarantee that the load completes before we update cons_tail */
327 	br->br_ring[cons_head].bre_ptr = NULL;
328 #ifdef DEBUG_BUFRING
329 	if (!mtx_owned(br->br_lock))
330 		panic("lock not held on single consumer dequeue");
331 	if (br->br_cons_tail != cons_head)
332 		panic("inconsistent list cons_tail=%d cons_head=%d",
333 		    br->br_cons_tail, cons_head);
334 #endif
335 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
336 
337 	return ((void *)(uintptr_t)buf);
338 }
339 
340 /*
341  * single-consumer advance after a peek
342  * use where it is protected by a lock
343  * e.g. a network driver's tx queue lock
344  */
345 static __inline void
buf_ring_advance_sc(struct buf_ring * br)346 buf_ring_advance_sc(struct buf_ring *br)
347 {
348 	uint32_t cons_head, cons_next;
349 	uint32_t prod_tail;
350 
351 	cons_head = br->br_cons_head;
352 	prod_tail = br->br_prod_tail;
353 
354 	cons_next = (cons_head + 1) & br->br_cons_mask;
355 	if (cons_head == prod_tail)
356 		return;
357 	br->br_cons_head = cons_next;
358 
359 	/*
360 	 * Storing NULL here serves two purposes:
361 	 * 1) it assures that the load of ring[cons_head] has completed
362 	 *    (only the most perverted architecture or compiler would
363 	 *    consider re-ordering a = *x; *x = b)
364 	 * 2) it allows us to enforce global ordering of the cons_tail
365 	 *    update with an atomic_store_rel_32
366 	 */
367 	br->br_ring[cons_head].bre_ptr = NULL;
368 	atomic_store_rel_32(&br->br_cons_tail, cons_next);
369 }
370 
371 /*
372  * Used to return a buffer (most likely already there)
373  * to the top od the ring. The caller should *not*
374  * have used any dequeue to pull it out of the ring
375  * but instead should have used the peek() function.
376  * This is normally used where the transmit queue
377  * of a driver is full, and an mubf must be returned.
378  * Most likely whats in the ring-buffer is what
379  * is being put back (since it was not removed), but
380  * sometimes the lower transmit function may have
381  * done a pullup or other function that will have
382  * changed it. As an optimzation we always put it
383  * back (since jhb says the store is probably cheaper),
384  * if we have to do a multi-queue version we will need
385  * the compare and an atomic.
386  *
387  */
388 static __inline void
buf_ring_putback_sc(struct buf_ring * br,void * new)389 buf_ring_putback_sc(struct buf_ring *br, void *new)
390 {
391 	KASSERT(br->br_cons_head != br->br_prod_tail,
392 		("Buf-Ring has none in putback")) ;
393 	br->br_ring[br->br_cons_head].bre_ptr = new;
394 }
395 
396 /*
397  * return a pointer to the first entry in the ring
398  * without modifying it, or NULL if the ring is empty
399  * race-prone if not protected by a lock
400  */
401 static __inline void *
buf_ring_peek(struct buf_ring * br)402 buf_ring_peek(struct buf_ring *br)
403 {
404 	uint32_t cons_head;
405 #ifdef DEBUG_BUFRING
406 	if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
407 		panic("lock not held on single consumer dequeue");
408 #endif
409 	cons_head = br->br_cons_head;
410 	/*
411 	 * for correctness prod_tail must be read before ring[cons_head]
412 	 */
413 
414 	if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail))
415 		return (NULL);
416 
417 	/* ensure that the ring load completes before
418 	 * exposing it to any destructive updates
419 	 */
420 	return ((void *)(uintptr_t)br->br_ring[cons_head].bre_ptr);
421 }
422 
423 static __inline int
buf_ring_full(struct buf_ring * br)424 buf_ring_full(struct buf_ring *br)
425 {
426 	/* br_cons_tail may be stale but the consumer understands that this is
427 	* only a point in time snapshot
428 	*/
429 	return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
430 }
431 
432 static __inline int
buf_ring_empty(struct buf_ring * br)433 buf_ring_empty(struct buf_ring *br)
434 {
435 	/*  br_prod_tail may be stale but the consumer understands that this is
436 	*  only a point in time snapshot
437 	*/
438 
439 	return (br->br_cons_head == br->br_prod_tail);
440 }
441 
442 static __inline int
buf_ring_count(struct buf_ring * br)443 buf_ring_count(struct buf_ring *br)
444 {
445 	/*  br_cons_tail and br_prod_tail may be stale but the consumer
446 	 * understands that this is only a point in time snapshot
447 	 */
448 
449 	return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
450 	    & br->br_prod_mask);
451 }
452 
453 struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
454     struct mtx *);
455 struct buf_ring *buf_ring_aligned_alloc(int count, struct malloc_type *type, int flags,
456     struct mtx *);
457 void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
458 
459 
460 
461 #endif
462