1 /*-
2 * Copyright (c) 2007-2015 Matt Macy <mmacy@nextbsd.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 * $FreeBSD$
27 *
28 */
29
30 #ifndef _SYS_BUF_RING_H_
31 #define _SYS_BUF_RING_H_
32
33 #include <machine/cpu.h>
34
35 #if defined(INVARIANTS) && !defined(DEBUG_BUFRING)
36 #define DEBUG_BUFRING 1
37 #endif
38
39 #ifdef DEBUG_BUFRING
40 #include <sys/lock.h>
41 #include <sys/mutex.h>
42 #endif
43
44 /* cache line align buf ring entries */
45 #define BR_FLAGS_ALIGNED 0x1
46
47 struct br_entry_ {
48 volatile void *bre_ptr;
49 };
50
51 struct buf_ring {
52 volatile uint32_t br_prod_head;
53 volatile uint32_t br_prod_tail;
54 int br_prod_size;
55 int br_prod_mask;
56 uint64_t br_drops;
57 /* cache line aligned to avoid cache line invalidate traffic
58 * between consumer and producer (false sharing)
59 */
60 volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE);
61 volatile uint32_t br_cons_tail;
62 int br_cons_size;
63 int br_cons_mask;
64 #ifdef DEBUG_BUFRING
65 struct mtx *br_lock;
66 #endif
67 /* cache line aligned to avoid false sharing with other data structures
68 */
69 int br_flags __aligned(CACHE_LINE_SIZE);
70 struct br_entry_ br_ring[0] __aligned(CACHE_LINE_SIZE);
71 };
72
73 /*
74 * ring entry accessors to allow us to make ring entry
75 * alignment determined at runtime
76 */
77 static __inline void *
br_entry_get(struct buf_ring * br,int i)78 br_entry_get(struct buf_ring *br, int i)
79 {
80 volatile void *ent;
81
82 if (br->br_flags & BR_FLAGS_ALIGNED)
83 ent = br->br_ring[i*(CACHE_LINE_SIZE/sizeof(caddr_t))].bre_ptr;
84 else
85 ent = br->br_ring[i].bre_ptr;
86 return ((void *)(uintptr_t)ent);
87 }
88
89 static __inline void
br_entry_set(struct buf_ring * br,int i,void * buf)90 br_entry_set(struct buf_ring *br, int i, void *buf)
91 {
92
93 if (br->br_flags & BR_FLAGS_ALIGNED)
94 br->br_ring[i*(CACHE_LINE_SIZE/sizeof(caddr_t))].bre_ptr = buf;
95 else
96 br->br_ring[i].bre_ptr = buf;
97 }
98
99 /*
100 * Many architectures other than x86 permit speculative re-ordering
101 * of loads. Unfortunately, atomic_load_acq_32() is comparatively
102 * expensive so we'd rather elide it if possible.
103 */
104 #if defined(__i386__) || defined(__amd64__)
105 #define ORDERED_LOAD_32(x) (*x)
106 #else
107 #define ORDERED_LOAD_32(x) atomic_load_acq_32((x))
108 #endif
109
110 /*
111 * Multi-producer safe lock-free ring buffer enqueue
112 *
113 * Most architectures do not support the atomic update of multiple
114 * discontiguous locations. So it is not possible to atomically update
115 * the producer index and ring buffer entry. To side-step this limitation
116 * we split update in to 3 steps:
117 * 1) atomically acquiring an index
118 * 2) updating the corresponding ring entry
119 * 3) making the update available to the consumer
120 * In order to split the index update in to an acquire and release
121 * phase there are _two_ producer indexes. 'prod_head' is used for
122 * step 1) and is thus only used by the enqueue itself. 'prod_tail'
123 * is used for step 3) to signal to the consumer that the update is
124 * complete. To guarantee memory ordering the update of 'prod_tail' is
125 * done with a atomic_store_rel_32(...) and the corresponding
126 * initial read of 'prod_tail' by the dequeue functions is done with
127 * an atomic_load_acq_32(...).
128 *
129 * Regarding memory ordering - there are five variables in question:
130 * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}]
131 * It's easiest examine correctness by considering the consequence of
132 * reading a stale value or having an update become visible prior to
133 * preceding writes.
134 *
135 * - prod_head: this is only read by the enqueue routine, if the latter were to
136 * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32)
137 * would fail. However, the implied memory barrier in cmpxchg would cause the
138 * subsequent read of prod_head to read the up-to-date value permitting the
139 * cmpxchg to succeed the second time.
140 *
141 * - prod_tail: This value is used by dequeue to determine the effective
142 * producer index. On architectures with weaker memory ordering than x86 it
143 * needs special handling. In enqueue it needs to be updated with
144 * atomic_store_rel_32() (i.e. a write memory barrier before update) to
145 * guarantee that the new ring value is committed to memory before it is
146 * made available by prod_tail. In dequeue to guarantee that it is read before
147 * br_ring[cons_head] it needs to be read with atomic_load_acq_32().
148 *
149 * - cons_head: this value is used only by dequeue, it is either updated
150 * atomically (dequeue_mc) or protected by a mutex (dequeue_sc).
151 *
152 * - cons_tail: This is used to communicate the latest consumer index between
153 * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue
154 * to fail erroneously. To avoid a load being re-ordered after a store (and
155 * thus permitting enqueue to store a new value before the old one has been
156 * consumed) it is updated with an atomic_store_rel_32() in deqeueue.
157 *
158 * - ring[idx] : Updates to this value need to reach memory before the subsequent
159 * update to prod_tail does. Reads need to happen before subsequent updates to
160 * cons_tail.
161 *
162 * Some implementation notes:
163 * - Much like a simpler single-producer single consumer ring buffer,
164 * the producer can not produce faster than the consumer. Hence the
165 * check of 'prod_head' + 1 against 'cons_tail'.
166 *
167 * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to
168 * calculate the next index is slightly cheaper than a modulo but
169 * requires the ring to be power-of-2 sized.
170 *
171 * - The critical_enter() / critical_exit() are not required for
172 * correctness. They prevent updates from stalling by having a producer be
173 * preempted after updating 'prod_head' but before updating 'prod_tail'.
174 *
175 * - The "while (br->br_prod_tail != prod_head)"
176 * check assures in order completion (probably not strictly necessary,
177 * but makes it easier to reason about) and allows us to update
178 * 'prod_tail' without a cmpxchg / LOCK prefix.
179 *
180 */
181 static __inline int
buf_ring_enqueue(struct buf_ring * br,void * buf)182 buf_ring_enqueue(struct buf_ring *br, void *buf)
183 {
184 uint32_t prod_head, prod_next, cons_tail;
185 #ifdef DEBUG_BUFRING
186 int i;
187 for (i = br->br_cons_head; i != br->br_prod_head;
188 i = ((i + 1) & br->br_cons_mask))
189 if(br->br_ring[i].bre_ptr == buf)
190 panic("buf=%p already enqueue at %d prod=%d cons=%d",
191 buf, i, br->br_prod_tail, br->br_cons_tail);
192 #endif
193 critical_enter();
194 do {
195
196 prod_head = br->br_prod_head;
197 prod_next = (prod_head + 1) & br->br_prod_mask;
198 cons_tail = br->br_cons_tail;
199
200 if (prod_next == cons_tail) {
201 /* ensure that we only return ENOBUFS
202 * if the latest value matches what we read
203 */
204 if (prod_head != atomic_load_acq_32(&br->br_prod_head) ||
205 cons_tail != atomic_load_acq_32(&br->br_cons_tail))
206 continue;
207
208 br->br_drops++;
209 critical_exit();
210 return (ENOBUFS);
211 }
212 } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
213 #ifdef DEBUG_BUFRING
214 if (br->br_ring[prod_head].bre_ptr != NULL)
215 panic("dangling value in enqueue");
216 #endif
217 br->br_ring[prod_head].bre_ptr = buf;
218
219 /*
220 * If there are other enqueues in progress
221 * that preceded us, we need to wait for them
222 * to complete
223 * re-ordering of reads would not effect correctness
224 */
225 while (br->br_prod_tail != prod_head)
226 cpu_spinwait();
227 /* ensure that the ring update reaches memory before the new
228 * value of prod_tail
229 */
230 atomic_store_rel_32(&br->br_prod_tail, prod_next);
231 critical_exit();
232 return (0);
233 }
234
235 /*
236 * multi-consumer safe dequeue
237 *
238 */
239 static __inline void *
buf_ring_dequeue_mc(struct buf_ring * br)240 buf_ring_dequeue_mc(struct buf_ring *br)
241 {
242 uint32_t cons_head, cons_next;
243 volatile void *buf;
244
245 critical_enter();
246 do {
247 /*
248 * prod_tail must be read before br_ring[cons_head] is
249 * and the atomic_cmpset_acq_32 on br_cons_head should
250 * enforce that
251 */
252 cons_head = br->br_cons_head;
253 if (cons_head == br->br_prod_tail) {
254 critical_exit();
255 return (NULL);
256 }
257 cons_next = (cons_head + 1) & br->br_cons_mask;
258 } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
259
260 /* ensure that the read completes before either of the
261 * subsequent stores
262 */
263 buf = br->br_ring[cons_head].bre_ptr;
264 /* guarantee that the load completes before we update cons_tail */
265 br->br_ring[cons_head].bre_ptr = NULL;
266
267 /*
268 * If there are other dequeues in progress
269 * that preceded us, we need to wait for them
270 * to complete - no memory barrier needed as
271 * re-ordering shouldn't effect correctness or
272 * progress
273 */
274 while (br->br_cons_tail != cons_head)
275 cpu_spinwait();
276 /*
277 * assure that the ring entry is read before
278 * marking the entry as free by updating cons_tail
279 */
280 atomic_store_rel_32(&br->br_cons_tail, cons_next);
281 critical_exit();
282
283 return ((void *)(uintptr_t)buf);
284 }
285
286 /*
287 * single-consumer dequeue
288 * use where dequeue is protected by a lock
289 * e.g. a network driver's tx queue lock
290 */
291 static __inline void *
buf_ring_dequeue_sc(struct buf_ring * br)292 buf_ring_dequeue_sc(struct buf_ring *br)
293 {
294 uint32_t cons_head, cons_next;
295 #ifdef PREFETCH_DEFINED
296 uint32_t cons_next_next;
297 uint32_t prod_tail;
298 #endif
299 volatile void *buf;
300
301 /*
302 * prod_tail tells whether or not br_ring[cons_head] is valid
303 * thus we must guarantee that it is read first
304 */
305 cons_head = br->br_cons_head;
306 if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail))
307 return (NULL);
308
309 cons_next = (cons_head + 1) & br->br_cons_mask;
310 #ifdef PREFETCH_DEFINED
311 /*
312 * If prod_tail is stale we will prefetch the wrong value - but this is safe
313 * as cache coherence (should) ensure that the when the value is loaded for
314 * actual use it is fetched from main memory
315 */
316 prod_tail = br->br_prod_tail;
317 cons_next_next = (cons_head + 2) & br->br_cons_mask;
318 if (cons_next != prod_tail) {
319 prefetch(br->br_ring[cons_next].bre_ptr);
320 if (cons_next_next != prod_tail)
321 prefetch(br->br_ring[cons_next_next].bre_ptr);
322 }
323 #endif
324 br->br_cons_head = cons_next;
325 buf = br->br_ring[cons_head].bre_ptr;
326 /* guarantee that the load completes before we update cons_tail */
327 br->br_ring[cons_head].bre_ptr = NULL;
328 #ifdef DEBUG_BUFRING
329 if (!mtx_owned(br->br_lock))
330 panic("lock not held on single consumer dequeue");
331 if (br->br_cons_tail != cons_head)
332 panic("inconsistent list cons_tail=%d cons_head=%d",
333 br->br_cons_tail, cons_head);
334 #endif
335 atomic_store_rel_32(&br->br_cons_tail, cons_next);
336
337 return ((void *)(uintptr_t)buf);
338 }
339
340 /*
341 * single-consumer advance after a peek
342 * use where it is protected by a lock
343 * e.g. a network driver's tx queue lock
344 */
345 static __inline void
buf_ring_advance_sc(struct buf_ring * br)346 buf_ring_advance_sc(struct buf_ring *br)
347 {
348 uint32_t cons_head, cons_next;
349 uint32_t prod_tail;
350
351 cons_head = br->br_cons_head;
352 prod_tail = br->br_prod_tail;
353
354 cons_next = (cons_head + 1) & br->br_cons_mask;
355 if (cons_head == prod_tail)
356 return;
357 br->br_cons_head = cons_next;
358
359 /*
360 * Storing NULL here serves two purposes:
361 * 1) it assures that the load of ring[cons_head] has completed
362 * (only the most perverted architecture or compiler would
363 * consider re-ordering a = *x; *x = b)
364 * 2) it allows us to enforce global ordering of the cons_tail
365 * update with an atomic_store_rel_32
366 */
367 br->br_ring[cons_head].bre_ptr = NULL;
368 atomic_store_rel_32(&br->br_cons_tail, cons_next);
369 }
370
371 /*
372 * Used to return a buffer (most likely already there)
373 * to the top od the ring. The caller should *not*
374 * have used any dequeue to pull it out of the ring
375 * but instead should have used the peek() function.
376 * This is normally used where the transmit queue
377 * of a driver is full, and an mubf must be returned.
378 * Most likely whats in the ring-buffer is what
379 * is being put back (since it was not removed), but
380 * sometimes the lower transmit function may have
381 * done a pullup or other function that will have
382 * changed it. As an optimzation we always put it
383 * back (since jhb says the store is probably cheaper),
384 * if we have to do a multi-queue version we will need
385 * the compare and an atomic.
386 *
387 */
388 static __inline void
buf_ring_putback_sc(struct buf_ring * br,void * new)389 buf_ring_putback_sc(struct buf_ring *br, void *new)
390 {
391 KASSERT(br->br_cons_head != br->br_prod_tail,
392 ("Buf-Ring has none in putback")) ;
393 br->br_ring[br->br_cons_head].bre_ptr = new;
394 }
395
396 /*
397 * return a pointer to the first entry in the ring
398 * without modifying it, or NULL if the ring is empty
399 * race-prone if not protected by a lock
400 */
401 static __inline void *
buf_ring_peek(struct buf_ring * br)402 buf_ring_peek(struct buf_ring *br)
403 {
404 uint32_t cons_head;
405 #ifdef DEBUG_BUFRING
406 if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
407 panic("lock not held on single consumer dequeue");
408 #endif
409 cons_head = br->br_cons_head;
410 /*
411 * for correctness prod_tail must be read before ring[cons_head]
412 */
413
414 if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail))
415 return (NULL);
416
417 /* ensure that the ring load completes before
418 * exposing it to any destructive updates
419 */
420 return ((void *)(uintptr_t)br->br_ring[cons_head].bre_ptr);
421 }
422
423 static __inline int
buf_ring_full(struct buf_ring * br)424 buf_ring_full(struct buf_ring *br)
425 {
426 /* br_cons_tail may be stale but the consumer understands that this is
427 * only a point in time snapshot
428 */
429 return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
430 }
431
432 static __inline int
buf_ring_empty(struct buf_ring * br)433 buf_ring_empty(struct buf_ring *br)
434 {
435 /* br_prod_tail may be stale but the consumer understands that this is
436 * only a point in time snapshot
437 */
438
439 return (br->br_cons_head == br->br_prod_tail);
440 }
441
442 static __inline int
buf_ring_count(struct buf_ring * br)443 buf_ring_count(struct buf_ring *br)
444 {
445 /* br_cons_tail and br_prod_tail may be stale but the consumer
446 * understands that this is only a point in time snapshot
447 */
448
449 return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
450 & br->br_prod_mask);
451 }
452
453 struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags,
454 struct mtx *);
455 struct buf_ring *buf_ring_aligned_alloc(int count, struct malloc_type *type, int flags,
456 struct mtx *);
457 void buf_ring_free(struct buf_ring *br, struct malloc_type *type);
458
459
460
461 #endif
462