1 /*-
2 * Copyright (c) 2007-2015 Matt Macy <mmacy@nextbsd.org>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29
30
31 #include <sys/param.h>
32 #include <sys/systm.h>
33 #include <sys/counter.h>
34 #include <sys/kernel.h>
35 #include <sys/libkern.h>
36 #include <sys/malloc.h>
37 #include <sys/ktr.h>
38 #include <sys/pcpu.h>
39 #include <sys/proc.h>
40 #include <sys/sched.h>
41 #include <sys/sysctl.h>
42
43 #include <sys/buf_ring.h>
44 #include <sys/buf_ring_sc.h>
45
46 #define ESTALLED 254 /* consumer is stalled */
47 #define EOWNED 255 /* consumer lock acquired */
48
49 #define ALIGN_SCALE (CACHE_LINE_SIZE/sizeof(caddr_t))
50
51
52 static struct buf_ring *
buf_ring_alloc_(int count,struct malloc_type * type,int flags,struct mtx * lock,int brflags)53 buf_ring_alloc_(int count, struct malloc_type *type, int flags, struct mtx *lock, int brflags)
54 {
55 struct buf_ring *br;
56 int alloc_count;
57
58 KASSERT(powerof2(count), ("buf ring must be size power of 2"));
59 alloc_count = (brflags & BR_FLAGS_ALIGNED) ? (count * ALIGN_SCALE) : count;
60
61 br = malloc(sizeof(struct buf_ring) + alloc_count*sizeof(caddr_t),
62 type, flags|M_ZERO);
63 if (br == NULL)
64 return (NULL);
65 br->br_flags = brflags;
66 #ifdef DEBUG_BUFRING
67 br->br_lock = lock;
68 #endif
69 br->br_prod_size = br->br_cons_size = count;
70 br->br_prod_mask = br->br_cons_mask = count-1;
71 br->br_prod_head = br->br_cons_head = 0;
72 br->br_prod_tail = br->br_cons_tail = 0;
73
74 return (br);
75 }
76
77 struct buf_ring *
buf_ring_alloc(int count,struct malloc_type * type,int flags,struct mtx * lock)78 buf_ring_alloc(int count, struct malloc_type *type, int flags, struct mtx *lock)
79 {
80
81 return (buf_ring_alloc_(count, type, flags, lock, 0));
82 }
83
84 struct buf_ring *
buf_ring_aligned_alloc(int count,struct malloc_type * type,int flags,struct mtx * lock)85 buf_ring_aligned_alloc(int count, struct malloc_type *type, int flags, struct mtx *lock)
86 {
87
88 return (buf_ring_alloc_(count, type, flags, lock, BR_FLAGS_ALIGNED));
89 }
90
91 void
buf_ring_free(struct buf_ring * br,struct malloc_type * type)92 buf_ring_free(struct buf_ring *br, struct malloc_type *type)
93 {
94
95 free(br, type);
96 }
97
98 /*
99 * buf_ring_sc definitions follow
100 */
101
102 #ifndef BRSC_DEBUG_COUNTERS
103 #ifdef INVARIANTS
104 #define BRSC_DEBUG_COUNTERS 1
105 #else
106 #define BRSC_DEBUG_COUNTERS 0
107 #endif /* !INVARIANTS */
108 #endif
109
110 #if BRSC_DEBUG_COUNTERS
111 static SYSCTL_NODE(_net, OID_AUTO, brsc, CTLFLAG_RD, 0,
112 "buf_ring_stats");
113
114 static int brsc_drains;
115 static int brsc_drain_handled;
116 static int brsc_nqs;
117 static int brsc_nq_pendings;
118 static int brsc_nq_entry_sets;
119 static int brsc_nq_entry_putbacks;
120 static int brsc_nq_entry_set_nulls;
121 static int brsc_nq_entry_gets;
122 static int brsc_nq_entry_advances;
123
124 static int brsc_nq_enobufs1;
125 static int brsc_nq_enobufs2;
126 static int brsc_nq_eowned;
127 static int brsc_nq_domain_eq;
128 static int brsc_deferred;
129
130
131 SYSCTL_INT(_net_brsc, OID_AUTO, drain, CTLFLAG_RD,
132 &brsc_drains, 0, "# times drain was called");
133 SYSCTL_INT(_net_brsc, OID_AUTO, drain_handled, CTLFLAG_RD,
134 &brsc_drain_handled, 0, "# times drain handler was called");
135 SYSCTL_INT(_net_brsc, OID_AUTO, nqs, CTLFLAG_RD,
136 &brsc_nqs, 0, "# times enqueue was called");
137 SYSCTL_INT(_net_brsc, OID_AUTO, nq_pendings, CTLFLAG_RD,
138 &brsc_nq_pendings, 0, "# times pending was set in nq");
139 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_sets, CTLFLAG_RD,
140 &brsc_nq_entry_sets, 0, "# times entry_set was called");
141 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_gets, CTLFLAG_RD,
142 &brsc_nq_entry_gets, 0, "# times entry_get was called");
143 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_putbacks, CTLFLAG_RD,
144 &brsc_nq_entry_putbacks, 0, "# times entry_putback was called");
145 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_advances, CTLFLAG_RD,
146 &brsc_nq_entry_advances, 0, "# times entry_advance was called");
147 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_set_nulls, CTLFLAG_RD,
148 &brsc_nq_entry_set_nulls, 0, "# times entry_set was called with null");
149 SYSCTL_INT(_net_brsc, OID_AUTO, enobufs1, CTLFLAG_RD,
150 &brsc_nq_enobufs1, 0, "# times ENOBUFS1 was returned from nq");
151 SYSCTL_INT(_net_brsc, OID_AUTO, enobufs2, CTLFLAG_RD,
152 &brsc_nq_enobufs2, 0, "# times ENOBUFS2 was returned from nq");
153 SYSCTL_INT(_net_brsc, OID_AUTO, eowned, CTLFLAG_RD,
154 &brsc_nq_eowned, 0, "# times lock was acquired in enqueue");
155 SYSCTL_INT(_net_brsc, OID_AUTO, nq_domain_eq, CTLFLAG_RD,
156 &brsc_nq_domain_eq, 0, "# times domain was set and matched");
157 SYSCTL_INT(_net_brsc, OID_AUTO, deferred, CTLFLAG_RD,
158 &brsc_deferred, 0, "# times deferred");
159
160
161 #define DBG_COUNTER_INC(name) atomic_add_int(&(brsc_ ## name), 1)
162 #else
163 #define DBG_COUNTER_INC(name)
164 #endif
165
166 struct br_sc_entry_ {
167 volatile void *bre_ptr;
168 };
169
170
171 #define BR_RING_ABDICATING (1<<31)
172 #define BR_RING_STALLED (1<<30)
173 #define BR_RING_IDLE (1<<29)
174 #define BR_RING_MAX (1<<28)
175 #define BR_RING_MASK (BR_RING_MAX-1)
176 #define BR_RING_FLAGS_MASK (~(BR_RING_MASK))
177
178 #define BR_INDEX(br, x) ((x) & br->br_mask)
179 #define BR_HANDOFF(br) ((br)->br_cons & (BR_RING_ABDICATING|BR_RING_IDLE))
180 #define BR_STALLED(br) ((br)->br_cons & BR_RING_STALLED)
181 #define BR_CONS_IDX(br) ((br)->br_cons & br->br_mask)
182
183
184 #define BR_RING_GEN (1<<29)
185 #define BR_RING_PENDING (1<<30)
186 #define BR_RING_OWNED (1<<31)
187
188 #define BR_GEN(br) (br->br_prod_head & BR_RING_GEN)
189
190 #define BR_NODOMAIN 0xffffffff
191
192 typedef enum br_state_ {
193 BR_IDLE = 1,
194 BR_ABDICATED,
195 BR_BUSY,
196 BR_STALLED
197 } br_state;
198
199 struct buf_ring_sc {
200 volatile uint32_t br_prod_head;
201 volatile uint32_t br_prod_tail;
202 /*
203 * The following values are not expected to be modified
204 * while the ring is in use, so we put them on their own
205 * cache line to avoid false sharing when they are read
206 */
207 int br_size __aligned(CACHE_LINE_SIZE);
208 int br_mask;
209 int br_flags;
210 int br_domain;
211 counter_u64_t br_enqueues;
212 counter_u64_t br_drops;
213 counter_u64_t br_starts;
214 counter_u64_t br_restarts;
215 counter_u64_t br_abdications;
216 counter_u64_t br_stalls;
217 int (*br_drain) (struct buf_ring_sc *br, int avail, void *sc);
218 void (*br_deferred) (struct buf_ring_sc *br, void *sc);
219 void *br_sc;
220 struct thread *br_owner;
221 /* cache line aligned to avoid cache line invalidate traffic
222 * between consumer and producer (false sharing)
223 *
224 */
225 volatile uint32_t br_cons __aligned(CACHE_LINE_SIZE);
226 /* cache line aligned to avoid false sharing with other data structures
227 * located just beyond the end of the ring
228 */
229 struct br_sc_entry_ br_ring[0] __aligned(CACHE_LINE_SIZE);
230 };
231
232 static void buf_ring_sc_lock(struct buf_ring_sc *br);
233 static int buf_ring_sc_trylock(struct buf_ring_sc *br);
234 static int buf_ring_sc_unlock(struct buf_ring_sc *br, br_state state);
235
236 static void buf_ring_sc_advance(struct buf_ring_sc *br, int count);
237
238 /*
239 * Many architectures other than x86 permit speculative re-ordering
240 * of loads. Unfortunately, atomic_load_acq_32() is comparatively
241 * expensive so we'd rather elide it if possible.
242 */
243 #if defined(__i386__) || defined(__amd64__)
244 #define ORDERED_LOAD_32(x) (*x)
245 #else
246 #define ORDERED_LOAD_32(x) atomic_load_acq_32((x))
247 #endif
248
249
250 static inline int
brsc_get_inuse(struct buf_ring_sc * br,int cidx,int pidx,int gen)251 brsc_get_inuse(struct buf_ring_sc *br, int cidx, int pidx, int gen)
252 {
253 int used;
254
255 if (gen && pidx == cidx)
256 used = br->br_size;
257 else if (pidx >= cidx)
258 used = pidx - cidx; /* encompasses gen == 0 && pidx == cidx */
259 else
260 used = br->br_size - cidx + pidx;
261
262 return (used);
263 }
264
265 static inline int
brsc_get_avail(struct buf_ring_sc * br,int cidx,int pidx,int gen)266 brsc_get_avail(struct buf_ring_sc *br, int cidx, int pidx, int gen)
267 {
268
269 return (br->br_size - brsc_get_inuse(br, cidx, pidx, gen));
270 }
271
272 /*
273 * ring entry accessors to allow us to make ring entry
274 * alignment determined at runtime
275 */
276 static __inline void *
brsc_entry_get(struct buf_ring_sc * br,int i)277 brsc_entry_get(struct buf_ring_sc *br, int i)
278 {
279 volatile void *ent;
280
281 MPASS(i >= 0 && i < br->br_size);
282
283 if (br->br_flags & BR_FLAGS_ALIGNED)
284 ent = br->br_ring[i*ALIGN_SCALE].bre_ptr;
285 else
286 ent = br->br_ring[i].bre_ptr;
287 return ((void *)(uintptr_t)ent);
288 }
289
290 static __inline void
brsc_entry_set(struct buf_ring_sc * br,int i,void * buf)291 brsc_entry_set(struct buf_ring_sc *br, int i, void *buf)
292 {
293 volatile void *bufp;
294
295 MPASS(i >= 0 && i < br->br_size);
296
297 DBG_COUNTER_INC(nq_entry_sets);
298
299 if (br->br_flags & BR_FLAGS_ALIGNED)
300 bufp = (volatile void *)&br->br_ring[i*ALIGN_SCALE].bre_ptr;
301 else
302 bufp = (volatile void *)&br->br_ring[i].bre_ptr;
303
304 if (buf == NULL) {
305 DBG_COUNTER_INC(nq_entry_set_nulls);
306 MPASS(*(void * volatile *)bufp != NULL);
307 }
308 *((void * volatile *)bufp) = buf;
309 }
310
311 static inline void
brsc_gen_clear(struct buf_ring_sc * br)312 brsc_gen_clear(struct buf_ring_sc *br)
313 {
314
315 atomic_clear_acq_int(&br->br_prod_head, BR_RING_GEN);
316 }
317
318 struct buf_ring_sc *
buf_ring_sc_alloc(int count,struct malloc_type * type,int flags,struct buf_ring_sc_consumer * brsc)319 buf_ring_sc_alloc(int count, struct malloc_type *type, int flags,
320 struct buf_ring_sc_consumer *brsc)
321 {
322 struct buf_ring_sc *br;
323 int alloc_count = count;
324
325 KASSERT(powerof2(count), ("buf ring must be size power of 2"));
326 if (brsc->brsc_flags & BR_FLAGS_ALIGNED)
327 alloc_count = count*ALIGN_SCALE;
328 br = malloc(sizeof(struct buf_ring) + alloc_count*sizeof(caddr_t),
329 type, flags|M_ZERO);
330 if (br == NULL)
331 return (NULL);
332
333 br->br_drain = brsc->brsc_drain;
334 br->br_deferred = brsc->brsc_deferred;
335 br->br_flags = brsc->brsc_flags;
336 br->br_sc = brsc->brsc_sc;
337 if (brsc->brsc_flags & BR_FLAGS_NUMA)
338 br->br_domain = brsc->brsc_domain;
339 else
340 br->br_domain = BR_NODOMAIN;
341 br->br_size = count;
342 br->br_mask = count-1;
343 br->br_prod_head = br->br_prod_tail = 0;
344 br->br_cons = 0;
345 br->br_enqueues = counter_u64_alloc(flags);
346 br->br_drops = counter_u64_alloc(flags);
347 br->br_abdications = counter_u64_alloc(flags);
348 br->br_stalls = counter_u64_alloc(flags);
349 br->br_starts = counter_u64_alloc(flags);
350 br->br_restarts = counter_u64_alloc(flags);
351 buf_ring_sc_reset_stats(br);
352 return (br);
353 }
354
355 void
buf_ring_sc_free(struct buf_ring_sc * br,struct malloc_type * type)356 buf_ring_sc_free(struct buf_ring_sc *br, struct malloc_type *type)
357 {
358 counter_u64_free(br->br_enqueues);
359 counter_u64_free(br->br_drops);
360 counter_u64_free(br->br_abdications);
361 counter_u64_free(br->br_stalls);
362 counter_u64_free(br->br_starts);
363 counter_u64_free(br->br_restarts);
364
365 free(br, type);
366 }
367
368 void
buf_ring_sc_reset_stats(struct buf_ring_sc * br)369 buf_ring_sc_reset_stats(struct buf_ring_sc *br)
370 {
371
372 counter_u64_zero(br->br_enqueues);
373 counter_u64_zero(br->br_drops);
374 counter_u64_zero(br->br_abdications);
375 counter_u64_zero(br->br_stalls);
376 counter_u64_zero(br->br_starts);
377 counter_u64_zero(br->br_restarts);
378 }
379
380 void
buf_ring_sc_get_stats_v0(struct buf_ring_sc * br,struct buf_ring_sc_stats_v0 * brss)381 buf_ring_sc_get_stats_v0(struct buf_ring_sc *br, struct buf_ring_sc_stats_v0 *brss)
382 {
383 brss->brs_enqueues = counter_u64_fetch(br->br_enqueues);
384 brss->brs_drops = counter_u64_fetch(br->br_drops);
385 brss->brs_abdications = counter_u64_fetch(br->br_abdications);
386 brss->brs_stalls = counter_u64_fetch(br->br_stalls);
387 brss->brs_starts = counter_u64_fetch(br->br_starts);
388 brss->brs_restarts = counter_u64_fetch(br->br_restarts);
389 }
390
391 static br_state
buf_ring_sc_drain_locked(struct buf_ring_sc * br,int budget)392 buf_ring_sc_drain_locked(struct buf_ring_sc *br, int budget)
393 {
394 uint32_t cidx = BR_CONS_IDX(br);
395 uint32_t pidx = br->br_prod_tail;
396 uint32_t gen = BR_GEN(br);
397 uint32_t n;
398 int inuse, prod_avail;
399 br_state state;
400
401 DBG_COUNTER_INC(drains);
402
403 KASSERT(budget > 0, ("calling drain with invalid budget"));
404 MPASS(br->br_prod_head & BR_RING_OWNED);
405 MPASS(br->br_owner == curthread);
406 state = BR_IDLE;
407 while (budget && (cidx != pidx || (gen && (pidx == cidx)))) {
408 inuse = brsc_get_inuse(br, cidx, pidx, gen);
409 prod_avail = min(inuse, budget);
410 MPASS(prod_avail > 0);
411 DBG_COUNTER_INC(drain_handled);
412 MPASS(br->br_owner == curthread);
413 n = br->br_drain(br, prod_avail, br->br_sc);
414 MPASS(br->br_owner == curthread);
415 KASSERT(n <= prod_avail, ("drain handler return invalid count"));
416 if (n == 0) {
417 state = BR_STALLED;
418 break;
419 }
420
421 buf_ring_sc_advance(br, n);
422 budget -= n;
423 if (budget == 0) {
424 if (BR_CONS_IDX(br) != br->br_prod_tail)
425 state = BR_ABDICATED;
426 else
427 state = BR_IDLE;
428 break;
429 }
430 gen = BR_GEN(br);
431 pidx = br->br_prod_tail;
432 cidx = BR_CONS_IDX(br);
433 }
434
435 return (state);
436 }
437
438 void
buf_ring_sc_drain(struct buf_ring_sc * br,int budget)439 buf_ring_sc_drain(struct buf_ring_sc *br, int budget)
440 {
441 br_state state;
442 int pending;
443
444 KASSERT(budget >= 0, ("calling drain with invalid budget"));
445 critical_enter();
446 if (br->br_domain == PCPU_GET(domain))
447 sched_pin();
448 else if (br->br_domain != BR_NODOMAIN) {
449 br->br_deferred(br, br->br_sc);
450 critical_exit();
451 return;
452 }
453 critical_exit();
454
455 if (__predict_false(budget == 0)) {
456 buf_ring_sc_lock(br);
457 budget = br->br_size;
458 } else if (!buf_ring_sc_trylock(br)) {
459 if (br->br_domain == PCPU_GET(domain))
460 sched_unpin();
461 return;
462 }
463
464 state = buf_ring_sc_drain_locked(br, budget);
465 if (br->br_domain == PCPU_GET(domain))
466 sched_unpin();
467 pending = buf_ring_sc_unlock(br, state);
468 if ((state == BR_ABDICATED) && pending == 0)
469 br->br_deferred(br, br->br_sc);
470 }
471
472 /*
473 * Multi-producer safe lock-free ring buffer enqueue
474 *
475 * Most architectures do not support the atomic update of multiple
476 * discontiguous locations. So it is not possible to atomically update
477 * the producer index and ring buffer entry. To side-step this limitation
478 * we split update in to 3 steps:
479 * 1) atomically acquiring an index
480 * 2) updating the corresponding ring entry
481 * 3) making the update available to the consumer
482 * In order to split the index update in to an acquire and release
483 * phase there are _two_ producer indexes. 'prod_head' is used for
484 * step 1) and is thus only used by the enqueue itself. 'prod_tail'
485 * is used for step 3) to signal to the consumer that the update is
486 * complete. To guarantee memory ordering the update of 'prod_tail' is
487 * done with a atomic_store_rel_32(...) and the corresponding
488 * initial read of 'prod_tail' by the dequeue functions is done with
489 * an atomic_load_acq_32(...).
490 *
491 * Regarding memory ordering - there are five variables in question:
492 * (br_) prod_head, prod_tail, cons, ring[idx={cons, prod}]
493 * It's easiest examine correctness by considering the consequence of
494 * reading a stale value or having an update become visible prior to
495 * preceding writes.
496 *
497 * - prod_head: this is only read by the enqueue routine, if the latter were to
498 * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32)
499 * would fail. However, the implied memory barrier in cmpxchg would cause the
500 * subsequent read of prod_head to read the up-to-date value permitting the
501 * cmpxchg to succeed the second time.
502 *
503 * - prod_tail: This value is used by dequeue to determine the effective
504 * producer index. On architectures with weaker memory ordering than x86 it
505 * needs special handling. In enqueue it needs to be updated with
506 * atomic_store_rel_32() (i.e. a write memory barrier before update) to
507 * guarantee that the new ring value is committed to memory before it is
508 * made available by prod_tail. In dequeue to guarantee that it is read before
509 * br_ring[cons] it needs to be read with atomic_load_acq_32().
510 *
511 *
512 * - cons: This is used to communicate the latest consumer index between
513 * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue
514 * to fail erroneously. To avoid a load being re-ordered after a store (and
515 * thus permitting enqueue to store a new value before the old one has been
516 * consumed) it is updated with an atomic_store_rel_32() in deqeueue.
517 *
518 * - ring[idx] : Updates to this value need to reach memory before the subsequent
519 * update to prod_tail does. Reads need to happen before subsequent updates to
520 * cons.
521 *
522 * Some implementation notes:
523 * - Much like a simpler single-producer single consumer ring buffer,
524 * the producer can not produce faster than the consumer. Hence the
525 * check of 'prod_head' + 1 against 'cons'.
526 *
527 * - The use of "prod_next = (prod_head + 1) & br->br_mask" to
528 * calculate the next index is slightly cheaper than a modulo but
529 * requires the ring to be power-of-2 sized.
530 *
531 * - The critical_enter() / critical_exit() are not required for
532 * correctness. They prevent updates from stalling by having a producer be
533 * preempted after updating 'prod_head' but before updating 'prod_tail'.
534 *
535 * - The "while (br->br_prod_tail != prod_head)"
536 * check assures in order completion allows us to update
537 * 'prod_tail' without a cmpxchg / LOCK prefix assures in order
538 * completion as a later producer might reach this point before an
539 * earlier consumer.
540 *
541 *
542 * This buf_ring has the following FSM:
543 * producer:
544 * - !owned -> owned(curthread) + enqueue
545 * - pending(!curthread) -> enqueue
546 * - owned + abdicating -> owned + abdicating + pending(curthread)
547 * - owned + abdicating + pending(curthread) ->
548 * owned + abdicating + pending(curthread) + enqueue
549 * - owned + abdicating + pending(curthread) + enqueue ->
550 * wait(!owned)
551 * - !owned + abdicating + pending(curthread) ->
552 * owned + busy + pending(curthread) ->
553 * owned + busy (consumer)
554 * consumer (i.e. owned(curthread)):
555 * - busy + owned -> abdicating + owned
556 * - abdicating + owned + pending -> abdicating + unowned + pending
557 * - abdicating + owned -> abdicating + unowned + enqueue tx task
558 * How do we handle abdication when the ring is full
559 */
560
561 int
buf_ring_sc_enqueue(struct buf_ring_sc * br,void * ents[],int count,int budget)562 buf_ring_sc_enqueue(struct buf_ring_sc *br, void *ents[], int count, int budget)
563 {
564 uint32_t prod_head, prod_next, cons;
565 uint32_t pidx, cidx, pidx_next;
566 int i, pending, rc, avail, domainvalid;
567 #ifdef DEBUG_BUFRING
568 int j;
569 for (i = BR_CONS_IDX(br); i != ORDERED_LOAD_32(&br->br_prod_tail);
570 i = ((i + 1) & br->br_mask))
571 for (j = 0; j < count; j++)
572 if (brsc_entry_get(br, i) == ents[j])
573 panic("buf=%p already enqueue at %d prod=%d cons=%d",
574 ents[j], i, br->br_prod_tail, BR_CONS_IDX(br));
575 #endif
576 MPASS(count > 0);
577 DBG_COUNTER_INC(nqs);
578 critical_enter();
579
580 if (br->br_domain == BR_NODOMAIN || br->br_domain == PCPU_GET(domain))
581 domainvalid = 1;
582 prod_head = br->br_prod_head;
583 pending = false;
584 rc = 0;
585
586 /*
587 * If the current consumer abdicated and no one has set the pending bit yet
588 * we loop until we set it we're the next lock holder - or if the owner
589 * drops the lock before we can do that then the lock will be
590 * re-acquired normally
591 */
592 while (BR_HANDOFF(br) && (prod_head & BR_RING_FLAGS_MASK) == BR_RING_OWNED && budget > 0 && domainvalid) {
593 prod_head = br->br_prod_head;
594 pidx = BR_INDEX(br, prod_head);
595 cons = br->br_cons;
596 cidx = BR_INDEX(br, cons);
597 avail = brsc_get_avail(br, cidx, pidx, BR_GEN(br));
598
599 if (count > avail) {
600 if (pidx != BR_INDEX(br, atomic_load_acq_32(&br->br_prod_head)) ||
601 cidx != BR_INDEX(br, atomic_load_acq_32(&br->br_cons)))
602 continue;
603 critical_exit();
604 counter_u64_add(br->br_drops, 1);
605 DBG_COUNTER_INC(nq_enobufs1);
606 return (ENOBUFS);
607 }
608
609 pidx_next = BR_INDEX(br, pidx + count);
610 /* set the pending bit and preserve the owned bit */
611 prod_next = pidx_next | BR_RING_PENDING | (prod_head & BR_RING_FLAGS_MASK);
612 /* we wrapped set the gen bit to indicate that cidx==pidx => ring full */
613 if (pidx_next < pidx)
614 prod_next |= BR_RING_GEN;
615 if (atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next)) {
616 pending = true;
617 DBG_COUNTER_INC(nq_pendings);
618 goto done;
619 }
620 prod_head = br->br_prod_head;
621 }
622 do {
623 rc = 0;
624 prod_head = br->br_prod_head;
625 pidx = BR_INDEX(br, prod_head);
626 cidx = BR_INDEX(br, br->br_cons);
627 avail = brsc_get_avail(br, cidx, pidx, BR_GEN(br));
628
629 if (count > avail) {
630 /* ensure that we only return ENOBUFS
631 * if the latest value matches what we read
632 */
633 if (pidx != BR_INDEX(br, atomic_load_acq_32(&br->br_prod_head)) ||
634 cidx != BR_INDEX(br, atomic_load_acq_32(&br->br_cons)))
635 continue;
636 critical_exit();
637 counter_u64_add(br->br_drops, 1);
638 DBG_COUNTER_INC(nq_enobufs2);
639 return (ENOBUFS);
640 }
641 prod_next = pidx_next = BR_INDEX(br, pidx + count);
642 /* preserve flags */
643 prod_next |= (prod_head & BR_RING_FLAGS_MASK);
644
645 /* we wrapped - set the gen bit to indicate that cidx==pidx => ring full */
646 if (pidx_next < pidx)
647 prod_next |= BR_RING_GEN;
648 /*
649 * If the ring is unowned, not pending, the budget is non-zero, and we're
650 * in the right domain, try to acquire the lock.
651 */
652 if ((prod_head & BR_RING_FLAGS_MASK) == 0 && budget > 0 && domainvalid) {
653 prod_next |= BR_RING_OWNED;
654 DBG_COUNTER_INC(nq_eowned);
655 rc = EOWNED;
656 }
657 } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
658 done:
659 for (i = 0; i < count; i++) {
660 MPASS(ents[i] != NULL);
661 brsc_entry_set(br, (BR_INDEX(br, prod_head)-(count-1))+i, ents[i]);
662 }
663 /*
664 * If there are other enqueues in progress
665 * that preceded us, we need to wait for them
666 * to complete
667 * re-ordering of reads would not effect correctness
668 */
669 while (br->br_prod_tail != BR_INDEX(br, prod_head))
670 cpu_spinwait();
671 /* ensure that the ring update reaches memory before the new
672 * value of prod_tail
673 */
674 atomic_store_rel_32(&br->br_prod_tail, BR_INDEX(br, prod_next));
675
676 /* now that we've completed the enqueue if we know that we're
677 * the next owner we need to wait for the current owner to clear
678 * the owned bit
679 */
680 if (pending) {
681 while ((br->br_prod_head & BR_RING_OWNED) == BR_RING_OWNED)
682 cpu_spinwait();
683 atomic_set_acq_32(&br->br_prod_head, BR_RING_OWNED);
684 DBG_COUNTER_INC(nq_eowned);
685 rc = EOWNED;
686 }
687 if (rc == EOWNED) {
688 MPASS(br->br_owner == NULL);
689 MPASS((br->br_prod_head & BR_RING_OWNED) == BR_RING_OWNED);
690 /* clear the flags bits from cons */
691 br->br_cons &= ~BR_RING_FLAGS_MASK;
692 br->br_owner = curthread;
693 if (br->br_domain == PCPU_GET(domain))
694 sched_pin();
695 }
696 /*
697 * we became owner by way of the contested abdicate clear pending
698 */
699 if (pending)
700 atomic_clear_rel_32(&br->br_prod_head, BR_RING_PENDING);
701
702 critical_exit();
703 counter_u64_add(br->br_enqueues, 1);
704
705 /* we're the owner - our buffers were enqueued */
706 if (rc == EOWNED) {
707 br_state state = buf_ring_sc_drain_locked(br, budget);
708 if (br->br_domain == PCPU_GET(domain))
709 sched_unpin();
710 if (buf_ring_sc_unlock(br, state) == 0 && state == BR_ABDICATED) {
711 DBG_COUNTER_INC(deferred);
712 br->br_deferred(br, br->br_sc); /* consumer's re-drive mechanism */
713 }
714 }
715 #if 0
716 else if (BR_STALLED(br)) {
717 /* buffer enqueued - but we can't do any work */
718 return (ESTALLED);
719 }
720 #endif
721 return (0);
722 }
723
724 /*
725 * populate ents with up to count values from the ring
726 * and return the number of entries
727 */
728 int
buf_ring_sc_peek(struct buf_ring_sc * br,void * ents[],uint16_t count)729 buf_ring_sc_peek(struct buf_ring_sc *br, void *ents[], uint16_t count)
730 {
731 uint32_t cons;
732 uint32_t prod_tail;
733 int i, inuse, prod_avail;
734
735 KASSERT(count > 0, ("peeking for zero entries"));
736 KASSERT((br->br_prod_head & BR_RING_OWNED) == BR_RING_OWNED, ("peeking without lock being held"));
737 MPASS(br->br_owner == curthread);
738 /*
739 * for correctness prod_tail must be read before ring[cons]
740 */
741 cons = BR_CONS_IDX(br);
742 prod_tail = ORDERED_LOAD_32(&br->br_prod_tail);
743 if ((inuse = brsc_get_inuse(br, cons, prod_tail, BR_GEN(br))) == 0)
744 return (0);
745
746 prod_avail = min(inuse, count);
747 for (i = 0; i < prod_avail; i++) {
748 DBG_COUNTER_INC(nq_entry_gets);
749 ents[i] = brsc_entry_get(br, BR_INDEX(br, cons + i));
750 MPASS(ents[i] != NULL);
751 }
752 return (prod_avail);
753 }
754
755 /*
756 * Used to return a buffer (most likely already there)
757 * to the top od the ring. The caller should *not*
758 * have used any dequeue to pull it out of the ring
759 * but instead should have used the peek() function.
760 * This is normally used where the transmit queue
761 * of a driver is full, and an mubf must be returned.
762 * Most likely whats in the ring-buffer is what
763 * is being put back (since it was not removed), but
764 * sometimes the lower transmit function may have
765 * done a pullup or other function that will have
766 * changed it. As an optimization we always put it
767 * back (since jhb says the store is probably cheaper),
768 * if we have to do a multi-queue version we will need
769 * the compare and an atomic.
770 *
771 */
772 void
buf_ring_sc_putback(struct buf_ring_sc * br,void * new,int idx)773 buf_ring_sc_putback(struct buf_ring_sc *br, void *new, int idx)
774 {
775 KASSERT(BR_CONS_IDX(br) != br->br_prod_tail,
776 ("Buf-Ring has none in putback")) ;
777 brsc_entry_set(br, BR_CONS_IDX(br) + idx, new);
778 }
779
780 /*
781 * @count: the number of entries by which to advance the consumer index
782 *
783 */
784 static void
buf_ring_sc_advance(struct buf_ring_sc * br,int count)785 buf_ring_sc_advance(struct buf_ring_sc *br, int count)
786 {
787 uint32_t cons, cons_next;
788 uint32_t prod_tail;
789 int i;
790
791 KASSERT(count > 0, ("invalid advance count"));
792 MPASS(br->br_owner == curthread);
793
794 DBG_COUNTER_INC(nq_entry_advances);
795
796 cons = BR_CONS_IDX(br);
797 prod_tail = br->br_prod_tail;
798 cons_next = BR_INDEX(br, cons + count);
799
800 /*
801 * Storing NULL here serves two purposes:
802 * 1) it assures that the load of ring[cons] has completed
803 * (only the most perverted architecture or compiler would
804 * consider re-ordering a = *x; *x = b)
805 * 2) it allows us to enforce global ordering of the cons
806 * update with an atomic_store_rel_32
807 */
808 for (i = 0; i < count; i++)
809 brsc_entry_set(br, BR_INDEX(br, cons + i), NULL);
810
811 if (cons_next < cons) {
812 MPASS(cons_next <= prod_tail);
813 brsc_gen_clear(br);
814 }
815 atomic_store_rel_32(&br->br_cons, cons_next);
816
817 }
818
819 /*
820 * mark the ring as being abdicated
821 */
822 void
buf_ring_sc_abdicate(struct buf_ring_sc * br)823 buf_ring_sc_abdicate(struct buf_ring_sc *br)
824 {
825 uint32_t cons_next;
826
827 cons_next = br->br_cons | BR_RING_ABDICATING;
828 counter_u64_add(br->br_abdications, 1);
829 critical_enter();
830
831 atomic_store_rel_32(&br->br_cons, cons_next);
832 }
833
834 int
buf_ring_sc_count(struct buf_ring_sc * br)835 buf_ring_sc_count(struct buf_ring_sc *br)
836 {
837 /* br_cons and br_prod_tail may be stale but the consumer
838 * understands that this is only a point in time snapshot
839 */
840
841 return (brsc_get_inuse(br, BR_INDEX(br, br->br_cons), br->br_prod_tail, BR_GEN(br)));
842 }
843
844 int
buf_ring_sc_empty(struct buf_ring_sc * br)845 buf_ring_sc_empty(struct buf_ring_sc *br)
846 {
847 /* br_prod_tail may be stale but the consumer understands that this is
848 * only a point in time snapshot
849 */
850
851 return ((BR_GEN(br) == 0) && BR_INDEX(br, br->br_cons) == br->br_prod_tail);
852 }
853
854 int
buf_ring_sc_full(struct buf_ring_sc * br)855 buf_ring_sc_full(struct buf_ring_sc *br)
856 {
857 /* br_cons may be stale but the caller understands that this is
858 * only a point in time snapshot
859 */
860 return (BR_GEN(br) && (BR_INDEX(br, br->br_cons) == br->br_prod_tail));
861 }
862
863 /*
864 * currently being used only for flushing all buffers
865 */
866 static void
buf_ring_sc_lock(struct buf_ring_sc * br)867 buf_ring_sc_lock(struct buf_ring_sc *br)
868 {
869 uint32_t value;
870
871 do {
872 while ((value = br->br_prod_head) & BR_RING_PENDING)
873 cpu_spinwait();
874 } while (!atomic_cmpset_acq_32(&br->br_prod_head, value, value | BR_RING_PENDING));
875 do {
876 while ((value = br->br_prod_head) & BR_RING_OWNED)
877 cpu_spinwait();
878 } while (!atomic_cmpset_acq_32(&br->br_prod_head, value, value | BR_RING_OWNED));
879 br->br_owner = curthread;
880 if (br->br_cons & BR_RING_IDLE)
881 counter_u64_add(br->br_starts, 1);
882 else if (br->br_cons & BR_RING_STALLED)
883 counter_u64_add(br->br_restarts, 1);
884 br->br_cons &= ~(BR_RING_IDLE|BR_RING_ABDICATING|BR_RING_STALLED);
885 atomic_clear_rel_32(&br->br_prod_head, BR_RING_PENDING);
886 }
887
888
889 static int
buf_ring_sc_trylock(struct buf_ring_sc * br)890 buf_ring_sc_trylock(struct buf_ring_sc *br)
891 {
892 uint32_t value;
893
894 do {
895 value = br->br_prod_head;
896 if (value & (BR_RING_OWNED|BR_RING_PENDING))
897 return (0);
898 } while (!atomic_cmpset_acq_32(&br->br_prod_head, value, value | BR_RING_OWNED | BR_RING_PENDING));
899
900 MPASS(br->br_owner == NULL);
901 MPASS((br->br_prod_head & (BR_RING_OWNED|BR_RING_PENDING)) == (BR_RING_OWNED|BR_RING_PENDING));
902 br->br_cons &= ~(BR_RING_IDLE|BR_RING_ABDICATING|BR_RING_STALLED);
903 atomic_clear_rel_32(&br->br_prod_head, BR_RING_PENDING);
904 br->br_owner = curthread;
905 if (br->br_cons & BR_RING_IDLE)
906 counter_u64_add(br->br_starts, 1);
907 else if (br->br_cons & BR_RING_STALLED)
908 counter_u64_add(br->br_restarts, 1);
909
910 return (1);
911 }
912
913 static int
buf_ring_sc_unlock(struct buf_ring_sc * br,br_state reason)914 buf_ring_sc_unlock(struct buf_ring_sc *br, br_state reason)
915 {
916 uint32_t prod_head, cons_next;
917 int pending;
918
919 KASSERT(br->br_prod_head & BR_RING_OWNED, ("unlocking unowned ring"));
920 /*
921 * we treat IDLE the same as ABDICATE to avoid a race
922 * with enqueue - they only differ for purposes of stats
923 * keeping
924 */
925 if (reason == BR_IDLE) {
926 cons_next = br->br_cons | BR_RING_IDLE;
927 critical_enter();
928 atomic_store_rel_32(&br->br_cons, cons_next);
929 } else if ((reason == BR_ABDICATED) &&
930 (br->br_cons & BR_RING_ABDICATING) == 0) {
931 cons_next = br->br_cons | BR_RING_ABDICATING;
932 counter_u64_add(br->br_abdications, 1);
933 critical_enter();
934 atomic_store_rel_32(&br->br_cons, cons_next);
935 } else if (reason == BR_STALLED) {
936 cons_next = br->br_cons | BR_RING_STALLED;
937 counter_u64_add(br->br_stalls, 1);
938 critical_enter();
939 atomic_store_rel_32(&br->br_cons, cons_next);
940 } else
941 panic("invalid unlock state");
942 br->br_owner = NULL;
943 do {
944 prod_head = br->br_prod_head;
945 pending = !!(prod_head & BR_RING_PENDING);
946 } while (!atomic_cmpset_rel_32(&br->br_prod_head, prod_head, prod_head & ~BR_RING_OWNED));
947 critical_exit();
948 return (pending);
949 }
950