xref: /NextBSD/sys/kern/subr_bufring.c (revision d3c636c9b89dbfe48820ea0171c6f6ab6e41d24a)
1 /*-
2  * Copyright (c) 2007-2015 Matt Macy <mmacy@nextbsd.org>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 
27 #include <sys/cdefs.h>
28 __FBSDID("$FreeBSD$");
29 
30 
31 #include <sys/param.h>
32 #include <sys/systm.h>
33 #include <sys/counter.h>
34 #include <sys/kernel.h>
35 #include <sys/libkern.h>
36 #include <sys/malloc.h>
37 #include <sys/ktr.h>
38 #include <sys/pcpu.h>
39 #include <sys/proc.h>
40 #include <sys/sched.h>
41 #include <sys/sysctl.h>
42 
43 #include <sys/buf_ring.h>
44 #include <sys/buf_ring_sc.h>
45 
46 #define ESTALLED        254      /* consumer is stalled */
47 #define EOWNED          255      /* consumer lock acquired */
48 
49 #define ALIGN_SCALE (CACHE_LINE_SIZE/sizeof(caddr_t))
50 
51 
52 static struct buf_ring *
buf_ring_alloc_(int count,struct malloc_type * type,int flags,struct mtx * lock,int brflags)53 buf_ring_alloc_(int count, struct malloc_type *type, int flags, struct mtx *lock, int brflags)
54 {
55 	struct buf_ring *br;
56 	int alloc_count;
57 
58 	KASSERT(powerof2(count), ("buf ring must be size power of 2"));
59 	alloc_count = (brflags & BR_FLAGS_ALIGNED) ? (count * ALIGN_SCALE) : count;
60 
61 	br = malloc(sizeof(struct buf_ring) + alloc_count*sizeof(caddr_t),
62 	    type, flags|M_ZERO);
63 	if (br == NULL)
64 		return (NULL);
65 	br->br_flags = brflags;
66 #ifdef DEBUG_BUFRING
67 	br->br_lock = lock;
68 #endif
69 	br->br_prod_size = br->br_cons_size = count;
70 	br->br_prod_mask = br->br_cons_mask = count-1;
71 	br->br_prod_head = br->br_cons_head = 0;
72 	br->br_prod_tail = br->br_cons_tail = 0;
73 
74 	return (br);
75 }
76 
77 struct buf_ring *
buf_ring_alloc(int count,struct malloc_type * type,int flags,struct mtx * lock)78 buf_ring_alloc(int count, struct malloc_type *type, int flags, struct mtx *lock)
79 {
80 
81 	return (buf_ring_alloc_(count, type, flags, lock, 0));
82 }
83 
84 struct buf_ring *
buf_ring_aligned_alloc(int count,struct malloc_type * type,int flags,struct mtx * lock)85 buf_ring_aligned_alloc(int count, struct malloc_type *type, int flags, struct mtx *lock)
86 {
87 
88 	return (buf_ring_alloc_(count, type, flags, lock, BR_FLAGS_ALIGNED));
89 }
90 
91 void
buf_ring_free(struct buf_ring * br,struct malloc_type * type)92 buf_ring_free(struct buf_ring *br, struct malloc_type *type)
93 {
94 
95 	free(br, type);
96 }
97 
98 /*
99  * buf_ring_sc definitions follow
100  */
101 
102 #ifndef BRSC_DEBUG_COUNTERS
103 #ifdef INVARIANTS
104 #define BRSC_DEBUG_COUNTERS 1
105 #else
106 #define BRSC_DEBUG_COUNTERS 0
107 #endif /* !INVARIANTS */
108 #endif
109 
110 #if BRSC_DEBUG_COUNTERS
111 static SYSCTL_NODE(_net, OID_AUTO, brsc, CTLFLAG_RD, 0,
112                    "buf_ring_stats");
113 
114 static int brsc_drains;
115 static int brsc_drain_handled;
116 static int brsc_nqs;
117 static int brsc_nq_pendings;
118 static int brsc_nq_entry_sets;
119 static int brsc_nq_entry_putbacks;
120 static int brsc_nq_entry_set_nulls;
121 static int brsc_nq_entry_gets;
122 static int brsc_nq_entry_advances;
123 
124 static int brsc_nq_enobufs1;
125 static int brsc_nq_enobufs2;
126 static int brsc_nq_eowned;
127 static int brsc_nq_domain_eq;
128 static int brsc_deferred;
129 
130 
131 SYSCTL_INT(_net_brsc, OID_AUTO, drain, CTLFLAG_RD,
132 		   &brsc_drains, 0, "# times drain was called");
133 SYSCTL_INT(_net_brsc, OID_AUTO, drain_handled, CTLFLAG_RD,
134 		   &brsc_drain_handled, 0, "# times drain handler was called");
135 SYSCTL_INT(_net_brsc, OID_AUTO, nqs, CTLFLAG_RD,
136 		   &brsc_nqs, 0, "# times enqueue was called");
137 SYSCTL_INT(_net_brsc, OID_AUTO, nq_pendings, CTLFLAG_RD,
138 		   &brsc_nq_pendings, 0, "# times pending was set in nq");
139 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_sets, CTLFLAG_RD,
140 		   &brsc_nq_entry_sets, 0, "# times entry_set was called");
141 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_gets, CTLFLAG_RD,
142 		   &brsc_nq_entry_gets, 0, "# times entry_get was called");
143 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_putbacks, CTLFLAG_RD,
144 		   &brsc_nq_entry_putbacks, 0, "# times entry_putback was called");
145 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_advances, CTLFLAG_RD,
146 		   &brsc_nq_entry_advances, 0, "# times entry_advance was called");
147 SYSCTL_INT(_net_brsc, OID_AUTO, nq_entry_set_nulls, CTLFLAG_RD,
148 		   &brsc_nq_entry_set_nulls, 0, "# times entry_set was called with null");
149 SYSCTL_INT(_net_brsc, OID_AUTO, enobufs1, CTLFLAG_RD,
150 		   &brsc_nq_enobufs1, 0, "# times ENOBUFS1 was returned from nq");
151 SYSCTL_INT(_net_brsc, OID_AUTO, enobufs2, CTLFLAG_RD,
152 		   &brsc_nq_enobufs2, 0, "# times ENOBUFS2 was returned from nq");
153 SYSCTL_INT(_net_brsc, OID_AUTO, eowned, CTLFLAG_RD,
154 		   &brsc_nq_eowned, 0, "# times lock was acquired in enqueue");
155 SYSCTL_INT(_net_brsc, OID_AUTO, nq_domain_eq, CTLFLAG_RD,
156 		   &brsc_nq_domain_eq, 0, "# times domain was set and matched");
157 SYSCTL_INT(_net_brsc, OID_AUTO, deferred, CTLFLAG_RD,
158 		   &brsc_deferred, 0, "# times deferred");
159 
160 
161 #define DBG_COUNTER_INC(name) atomic_add_int(&(brsc_ ## name), 1)
162 #else
163 #define DBG_COUNTER_INC(name)
164 #endif
165 
166 struct br_sc_entry_ {
167 	volatile void *bre_ptr;
168 };
169 
170 
171 #define BR_RING_ABDICATING (1<<31)
172 #define BR_RING_STALLED    (1<<30)
173 #define BR_RING_IDLE       (1<<29)
174 #define BR_RING_MAX        (1<<28)
175 #define BR_RING_MASK       (BR_RING_MAX-1)
176 #define BR_RING_FLAGS_MASK       (~(BR_RING_MASK))
177 
178 #define BR_INDEX(br, x) ((x) & br->br_mask)
179 #define BR_HANDOFF(br) ((br)->br_cons & (BR_RING_ABDICATING|BR_RING_IDLE))
180 #define BR_STALLED(br) ((br)->br_cons & BR_RING_STALLED)
181 #define BR_CONS_IDX(br) ((br)->br_cons & br->br_mask)
182 
183 
184 #define BR_RING_GEN		(1<<29)
185 #define BR_RING_PENDING (1<<30)
186 #define BR_RING_OWNED   (1<<31)
187 
188 #define BR_GEN(br) (br->br_prod_head & BR_RING_GEN)
189 
190 #define BR_NODOMAIN 0xffffffff
191 
192 typedef enum br_state_ {
193 	BR_IDLE = 1,
194 	BR_ABDICATED,
195 	BR_BUSY,
196 	BR_STALLED
197 } br_state;
198 
199 struct buf_ring_sc {
200 	volatile uint32_t	br_prod_head;
201 	volatile uint32_t	br_prod_tail;
202 	/*
203 	 * The following values are not expected to be modified
204 	 * while the ring is in use, so we put them on their own
205 	 * cache line to avoid false sharing when they are read
206 	 */
207 	int              	br_size __aligned(CACHE_LINE_SIZE);
208 	int              	br_mask;
209 	int			br_flags;
210 	int			br_domain;
211 	counter_u64_t		br_enqueues;
212 	counter_u64_t		br_drops;
213 	counter_u64_t		br_starts;
214 	counter_u64_t		br_restarts;
215 	counter_u64_t		br_abdications;
216 	counter_u64_t		br_stalls;
217 	int (*br_drain) (struct buf_ring_sc *br, int avail, void *sc);
218 	void (*br_deferred) (struct buf_ring_sc *br, void *sc);
219 	void *br_sc;
220 	struct thread		*br_owner;
221 	/* cache line aligned to avoid cache line invalidate traffic
222 	 * between consumer and producer (false sharing)
223 	 *
224 	 */
225 	volatile uint32_t	br_cons __aligned(CACHE_LINE_SIZE);
226 	/* cache line aligned to avoid false sharing with other data structures
227 	 * located just beyond the end of the ring
228 	 */
229 	struct br_sc_entry_	br_ring[0] __aligned(CACHE_LINE_SIZE);
230 };
231 
232 static void buf_ring_sc_lock(struct buf_ring_sc *br);
233 static int buf_ring_sc_trylock(struct buf_ring_sc *br);
234 static int buf_ring_sc_unlock(struct buf_ring_sc *br, br_state state);
235 
236 static void buf_ring_sc_advance(struct buf_ring_sc *br, int count);
237 
238 /*
239  * Many architectures other than x86 permit speculative re-ordering
240  * of loads. Unfortunately, atomic_load_acq_32() is comparatively
241  * expensive so we'd rather elide it if possible.
242  */
243 #if defined(__i386__) || defined(__amd64__)
244 #define ORDERED_LOAD_32(x) (*x)
245 #else
246 #define ORDERED_LOAD_32(x) atomic_load_acq_32((x))
247 #endif
248 
249 
250 static inline int
brsc_get_inuse(struct buf_ring_sc * br,int cidx,int pidx,int gen)251 brsc_get_inuse(struct buf_ring_sc *br, int cidx, int pidx, int gen)
252 {
253 	int used;
254 
255 	if (gen && pidx == cidx)
256 		used = br->br_size;
257 	else if (pidx >= cidx)
258 		used = pidx - cidx; /* encompasses gen == 0 && pidx == cidx */
259 	else
260 		used = br->br_size - cidx + pidx;
261 
262 	return (used);
263 }
264 
265 static inline int
brsc_get_avail(struct buf_ring_sc * br,int cidx,int pidx,int gen)266 brsc_get_avail(struct buf_ring_sc *br, int cidx, int pidx, int gen)
267 {
268 
269 	return (br->br_size - brsc_get_inuse(br, cidx, pidx, gen));
270 }
271 
272 /*
273  * ring entry accessors to allow us to make ring entry
274  * alignment determined at runtime
275  */
276 static __inline void *
brsc_entry_get(struct buf_ring_sc * br,int i)277 brsc_entry_get(struct buf_ring_sc *br, int i)
278 {
279 	volatile void *ent;
280 
281 	MPASS(i >= 0 && i < br->br_size);
282 
283 	if (br->br_flags & BR_FLAGS_ALIGNED)
284 		ent = br->br_ring[i*ALIGN_SCALE].bre_ptr;
285 	else
286 		ent = br->br_ring[i].bre_ptr;
287 	return ((void *)(uintptr_t)ent);
288 }
289 
290 static __inline void
brsc_entry_set(struct buf_ring_sc * br,int i,void * buf)291 brsc_entry_set(struct buf_ring_sc *br, int i, void *buf)
292 {
293 	volatile void *bufp;
294 
295 	MPASS(i >= 0 && i < br->br_size);
296 
297 	DBG_COUNTER_INC(nq_entry_sets);
298 
299 	if (br->br_flags & BR_FLAGS_ALIGNED)
300 		bufp = (volatile void *)&br->br_ring[i*ALIGN_SCALE].bre_ptr;
301 	else
302 		bufp = (volatile void *)&br->br_ring[i].bre_ptr;
303 
304 	if (buf == NULL) {
305 		DBG_COUNTER_INC(nq_entry_set_nulls);
306 		MPASS(*(void * volatile *)bufp != NULL);
307 	}
308 	*((void * volatile *)bufp) = buf;
309 }
310 
311 static inline void
brsc_gen_clear(struct buf_ring_sc * br)312 brsc_gen_clear(struct buf_ring_sc *br)
313 {
314 
315 	atomic_clear_acq_int(&br->br_prod_head, BR_RING_GEN);
316 }
317 
318 struct buf_ring_sc *
buf_ring_sc_alloc(int count,struct malloc_type * type,int flags,struct buf_ring_sc_consumer * brsc)319 buf_ring_sc_alloc(int count, struct malloc_type *type, int flags,
320 	struct buf_ring_sc_consumer *brsc)
321 {
322 	struct buf_ring_sc *br;
323 	int alloc_count = count;
324 
325 	KASSERT(powerof2(count), ("buf ring must be size power of 2"));
326 	if (brsc->brsc_flags & BR_FLAGS_ALIGNED)
327 		alloc_count = count*ALIGN_SCALE;
328 	br = malloc(sizeof(struct buf_ring) + alloc_count*sizeof(caddr_t),
329 	    type, flags|M_ZERO);
330 	if (br == NULL)
331 		return (NULL);
332 
333 	br->br_drain = brsc->brsc_drain;
334 	br->br_deferred = brsc->brsc_deferred;
335 	br->br_flags = brsc->brsc_flags;
336 	br->br_sc = brsc->brsc_sc;
337 	if (brsc->brsc_flags & BR_FLAGS_NUMA)
338 		br->br_domain = brsc->brsc_domain;
339 	else
340 		br->br_domain = BR_NODOMAIN;
341 	br->br_size = count;
342 	br->br_mask = count-1;
343 	br->br_prod_head = br->br_prod_tail = 0;
344 	br->br_cons = 0;
345 	br->br_enqueues = counter_u64_alloc(flags);
346 	br->br_drops = counter_u64_alloc(flags);
347 	br->br_abdications = counter_u64_alloc(flags);
348 	br->br_stalls = counter_u64_alloc(flags);
349 	br->br_starts = counter_u64_alloc(flags);
350 	br->br_restarts = counter_u64_alloc(flags);
351 	buf_ring_sc_reset_stats(br);
352 	return (br);
353 }
354 
355 void
buf_ring_sc_free(struct buf_ring_sc * br,struct malloc_type * type)356 buf_ring_sc_free(struct buf_ring_sc *br, struct malloc_type *type)
357 {
358 	counter_u64_free(br->br_enqueues);
359 	counter_u64_free(br->br_drops);
360 	counter_u64_free(br->br_abdications);
361 	counter_u64_free(br->br_stalls);
362 	counter_u64_free(br->br_starts);
363 	counter_u64_free(br->br_restarts);
364 
365 	free(br, type);
366 }
367 
368 void
buf_ring_sc_reset_stats(struct buf_ring_sc * br)369 buf_ring_sc_reset_stats(struct buf_ring_sc *br)
370 {
371 
372 	counter_u64_zero(br->br_enqueues);
373 	counter_u64_zero(br->br_drops);
374 	counter_u64_zero(br->br_abdications);
375 	counter_u64_zero(br->br_stalls);
376 	counter_u64_zero(br->br_starts);
377 	counter_u64_zero(br->br_restarts);
378 }
379 
380 void
buf_ring_sc_get_stats_v0(struct buf_ring_sc * br,struct buf_ring_sc_stats_v0 * brss)381 buf_ring_sc_get_stats_v0(struct buf_ring_sc *br, struct buf_ring_sc_stats_v0 *brss)
382 {
383 	brss->brs_enqueues = counter_u64_fetch(br->br_enqueues);
384 	brss->brs_drops = counter_u64_fetch(br->br_drops);
385 	brss->brs_abdications = counter_u64_fetch(br->br_abdications);
386 	brss->brs_stalls = counter_u64_fetch(br->br_stalls);
387 	brss->brs_starts = counter_u64_fetch(br->br_starts);
388 	brss->brs_restarts = counter_u64_fetch(br->br_restarts);
389 }
390 
391 static br_state
buf_ring_sc_drain_locked(struct buf_ring_sc * br,int budget)392 buf_ring_sc_drain_locked(struct buf_ring_sc *br, int budget)
393 {
394 	uint32_t cidx = BR_CONS_IDX(br);
395 	uint32_t pidx = br->br_prod_tail;
396 	uint32_t gen = BR_GEN(br);
397 	uint32_t n;
398 	int inuse, prod_avail;
399 	br_state state;
400 
401 	DBG_COUNTER_INC(drains);
402 
403 	KASSERT(budget > 0, ("calling drain with invalid budget"));
404 	MPASS(br->br_prod_head & BR_RING_OWNED);
405 	MPASS(br->br_owner == curthread);
406 	state = BR_IDLE;
407 	while (budget && (cidx != pidx || (gen && (pidx == cidx)))) {
408 		inuse = brsc_get_inuse(br, cidx, pidx, gen);
409 		prod_avail = min(inuse, budget);
410 		MPASS(prod_avail > 0);
411 		DBG_COUNTER_INC(drain_handled);
412 		MPASS(br->br_owner == curthread);
413 		n = br->br_drain(br, prod_avail, br->br_sc);
414 		MPASS(br->br_owner == curthread);
415 		KASSERT(n <= prod_avail, ("drain handler return invalid count"));
416 		if (n == 0) {
417 			state = BR_STALLED;
418 			break;
419 		}
420 
421 		buf_ring_sc_advance(br, n);
422 		budget -= n;
423 		if (budget == 0) {
424 			if (BR_CONS_IDX(br) != br->br_prod_tail)
425 				state = BR_ABDICATED;
426 			else
427 				state = BR_IDLE;
428 			break;
429 		}
430 		gen = BR_GEN(br);
431 		pidx = br->br_prod_tail;
432 		cidx = BR_CONS_IDX(br);
433 	}
434 
435 	return (state);
436 }
437 
438 void
buf_ring_sc_drain(struct buf_ring_sc * br,int budget)439 buf_ring_sc_drain(struct buf_ring_sc *br, int budget)
440 {
441 	br_state state;
442 	int pending;
443 
444 	KASSERT(budget >= 0, ("calling drain with invalid budget"));
445 	critical_enter();
446 	if (br->br_domain == PCPU_GET(domain))
447 		sched_pin();
448 	else if (br->br_domain != BR_NODOMAIN) {
449 		br->br_deferred(br, br->br_sc);
450 		critical_exit();
451 		return;
452 	}
453 	critical_exit();
454 
455 	if (__predict_false(budget == 0)) {
456 		buf_ring_sc_lock(br);
457 		budget = br->br_size;
458 	} else if (!buf_ring_sc_trylock(br)) {
459 		if (br->br_domain == PCPU_GET(domain))
460 			sched_unpin();
461 		return;
462 	}
463 
464 	state = buf_ring_sc_drain_locked(br, budget);
465 	if (br->br_domain == PCPU_GET(domain))
466 			sched_unpin();
467 	pending = buf_ring_sc_unlock(br, state);
468 	if ((state == BR_ABDICATED) && pending == 0)
469 		br->br_deferred(br, br->br_sc);
470 }
471 
472 /*
473  * Multi-producer safe lock-free ring buffer enqueue
474  *
475  * Most architectures do not support the atomic update of multiple
476  * discontiguous locations. So it is not possible to atomically update
477  * the producer index and ring buffer entry. To side-step this limitation
478  * we split update in to 3 steps:
479  *      1) atomically acquiring an index
480  *      2) updating the corresponding ring entry
481  *      3) making the update available to the consumer
482  * In order to split the index update in to an acquire and release
483  * phase there are _two_ producer indexes. 'prod_head' is used for
484  * step 1) and is thus only used by the enqueue itself. 'prod_tail'
485  * is used for step 3) to signal to the consumer that the update is
486  * complete. To guarantee memory ordering the update of 'prod_tail' is
487  * done with a atomic_store_rel_32(...) and the corresponding
488  * initial read of 'prod_tail' by the dequeue functions is done with
489  * an atomic_load_acq_32(...).
490  *
491  * Regarding memory ordering - there are five variables in question:
492  * (br_) prod_head, prod_tail, cons, ring[idx={cons, prod}]
493  * It's easiest examine correctness by considering the consequence of
494  * reading a stale value or having an update become visible prior to
495  * preceding writes.
496  *
497  * - prod_head: this is only read by the enqueue routine, if the latter were to
498  *   initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32)
499  *   would fail. However, the implied memory barrier in cmpxchg would cause the
500  *   subsequent read of prod_head to read the up-to-date value permitting the
501  *   cmpxchg to succeed the second time.
502  *
503  * - prod_tail: This value is used by dequeue to determine the effective
504  *   producer index. On architectures with weaker memory ordering than x86 it
505  *   needs special handling. In enqueue it needs to be updated with
506  *   atomic_store_rel_32() (i.e. a write memory barrier before update) to
507  *   guarantee that the new ring value is committed to memory before it is
508  *   made available by prod_tail. In dequeue to guarantee that it is read before
509  *   br_ring[cons] it needs to be read with atomic_load_acq_32().
510  *
511  *
512  * - cons: This is used to communicate the latest consumer index between
513  *   dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue
514  *   to fail erroneously. To avoid a load being re-ordered after a store (and
515  *   thus permitting enqueue to store a new value before the old one has been
516  *   consumed) it is updated with an atomic_store_rel_32() in deqeueue.
517  *
518  * - ring[idx] : Updates to this value need to reach memory before the subsequent
519  *   update to prod_tail does. Reads need to happen before subsequent updates to
520  *   cons.
521  *
522  * Some implementation notes:
523  * - Much like a simpler single-producer single consumer ring buffer,
524  *   the producer can not produce faster than the consumer. Hence the
525  *   check of 'prod_head' + 1 against 'cons'.
526  *
527  * - The use of "prod_next = (prod_head + 1) & br->br_mask" to
528  *   calculate the next index is slightly cheaper than a modulo but
529  *   requires the ring to be power-of-2 sized.
530  *
531  * - The critical_enter() / critical_exit() are not required for
532  *   correctness. They prevent updates from stalling by having a producer be
533  *   preempted after updating 'prod_head' but before updating 'prod_tail'.
534  *
535  * - The "while (br->br_prod_tail != prod_head)"
536  *   check assures in order completion allows us to update
537  *   'prod_tail' without a cmpxchg / LOCK prefix assures in order
538  *   completion as a later producer might reach this point before an
539  *   earlier consumer.
540  *
541  *
542  *   This buf_ring has the following FSM:
543  *     producer:
544  *     -  !owned              -> owned(curthread) + enqueue
545  *     -  pending(!curthread) -> enqueue
546  *     -  owned + abdicating  -> owned + abdicating + pending(curthread)
547  *     -  owned + abdicating + pending(curthread) ->
548  *            owned + abdicating + pending(curthread) + enqueue
549  *     -  owned + abdicating + pending(curthread) + enqueue  ->
550  *           wait(!owned)
551  *     - !owned + abdicating + pending(curthread) ->
552  *        owned + busy + pending(curthread) ->
553  *        owned + busy (consumer)
554  *     consumer (i.e. owned(curthread)):
555  *      -  busy + owned -> abdicating + owned
556  *      -  abdicating + owned + pending -> abdicating + unowned + pending
557  *      -  abdicating + owned -> abdicating + unowned + enqueue tx task
558  *      How do we handle abdication when the ring is full
559  */
560 
561 int
buf_ring_sc_enqueue(struct buf_ring_sc * br,void * ents[],int count,int budget)562 buf_ring_sc_enqueue(struct buf_ring_sc *br, void *ents[], int count, int budget)
563 {
564 	uint32_t prod_head, prod_next, cons;
565 	uint32_t pidx, cidx, pidx_next;
566 	int i, pending, rc, avail, domainvalid;
567 #ifdef DEBUG_BUFRING
568 	int j;
569 	for (i = BR_CONS_IDX(br); i != ORDERED_LOAD_32(&br->br_prod_tail);
570 	     i = ((i + 1) & br->br_mask))
571 		for (j = 0; j < count; j++)
572 			if (brsc_entry_get(br, i) == ents[j])
573 				panic("buf=%p already enqueue at %d prod=%d cons=%d",
574 					  ents[j], i, br->br_prod_tail, BR_CONS_IDX(br));
575 #endif
576 	MPASS(count > 0);
577 	DBG_COUNTER_INC(nqs);
578 	critical_enter();
579 
580 	if (br->br_domain == BR_NODOMAIN || br->br_domain == PCPU_GET(domain))
581 		domainvalid = 1;
582 	prod_head = br->br_prod_head;
583 	pending = false;
584 	rc = 0;
585 
586 	/*
587 	 * If the current consumer abdicated and no one has set the pending bit yet
588 	 * we loop until we set it we're the next lock holder - or if the owner
589 	 * drops the lock before we can do that then the lock will be
590 	 * re-acquired normally
591 	 */
592 	while (BR_HANDOFF(br) && (prod_head & BR_RING_FLAGS_MASK) == BR_RING_OWNED && budget > 0 && domainvalid) {
593 		prod_head = br->br_prod_head;
594 		pidx = BR_INDEX(br, prod_head);
595 		cons = br->br_cons;
596 		cidx = BR_INDEX(br, cons);
597 		avail = brsc_get_avail(br, cidx, pidx, BR_GEN(br));
598 
599 		if (count > avail) {
600 			if (pidx != BR_INDEX(br, atomic_load_acq_32(&br->br_prod_head)) ||
601 			    cidx != BR_INDEX(br, atomic_load_acq_32(&br->br_cons)))
602 				continue;
603 			critical_exit();
604 			counter_u64_add(br->br_drops, 1);
605 			DBG_COUNTER_INC(nq_enobufs1);
606 			return (ENOBUFS);
607 		}
608 
609 		pidx_next = BR_INDEX(br, pidx + count);
610 		/* set the pending bit and preserve the owned bit */
611 		prod_next = pidx_next | BR_RING_PENDING | (prod_head & BR_RING_FLAGS_MASK);
612 		/* we wrapped set the gen bit to indicate that cidx==pidx => ring full */
613 		if (pidx_next < pidx)
614 			prod_next |= BR_RING_GEN;
615 		if (atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next)) {
616 			pending = true;
617 			DBG_COUNTER_INC(nq_pendings);
618 			goto done;
619 		}
620 		prod_head = br->br_prod_head;
621 	}
622 	do {
623 		rc = 0;
624 		prod_head = br->br_prod_head;
625 		pidx = BR_INDEX(br, prod_head);
626 		cidx = BR_INDEX(br, br->br_cons);
627 		avail = brsc_get_avail(br, cidx, pidx, BR_GEN(br));
628 
629 		if (count > avail) {
630 			/* ensure that we only return ENOBUFS
631 			 * if the latest value matches what we read
632 			 */
633 			if (pidx != BR_INDEX(br, atomic_load_acq_32(&br->br_prod_head)) ||
634 			    cidx != BR_INDEX(br, atomic_load_acq_32(&br->br_cons)))
635 				continue;
636 			critical_exit();
637 			counter_u64_add(br->br_drops, 1);
638 			DBG_COUNTER_INC(nq_enobufs2);
639 			return (ENOBUFS);
640 		}
641 		prod_next = pidx_next = BR_INDEX(br, pidx + count);
642 		/* preserve flags */
643 		prod_next |= (prod_head & BR_RING_FLAGS_MASK);
644 
645 		/* we wrapped - set the gen bit to indicate that cidx==pidx => ring full */
646 		if (pidx_next < pidx)
647 			prod_next |= BR_RING_GEN;
648 		/*
649 		 * If the ring is unowned, not pending, the budget is non-zero, and we're
650 		 * in the right domain, try to acquire the lock.
651 		 */
652 		if ((prod_head & BR_RING_FLAGS_MASK) == 0 && budget > 0 && domainvalid) {
653 			prod_next |= BR_RING_OWNED;
654 			DBG_COUNTER_INC(nq_eowned);
655 			rc = EOWNED;
656 		}
657 	} while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
658 	done:
659 	for (i = 0; i < count; i++) {
660 		MPASS(ents[i] != NULL);
661 		brsc_entry_set(br, (BR_INDEX(br, prod_head)-(count-1))+i, ents[i]);
662 	}
663 	/*
664 	 * If there are other enqueues in progress
665 	 * that preceded us, we need to wait for them
666 	 * to complete
667 	 * re-ordering of reads would not effect correctness
668 	 */
669 	while (br->br_prod_tail != BR_INDEX(br, prod_head))
670 		cpu_spinwait();
671 	/* ensure  that the ring update reaches memory before the new
672 	 * value of prod_tail
673 	 */
674 	atomic_store_rel_32(&br->br_prod_tail, BR_INDEX(br, prod_next));
675 
676 	/* now that we've completed the enqueue if we know that we're
677 	 * the next owner we need to wait for the current owner to clear
678 	 * the owned bit
679 	 */
680 	if (pending) {
681 		while ((br->br_prod_head & BR_RING_OWNED) == BR_RING_OWNED)
682 			cpu_spinwait();
683 		atomic_set_acq_32(&br->br_prod_head, BR_RING_OWNED);
684 		DBG_COUNTER_INC(nq_eowned);
685 		rc = EOWNED;
686 	}
687 	if (rc == EOWNED) {
688 		MPASS(br->br_owner == NULL);
689 		MPASS((br->br_prod_head & BR_RING_OWNED) == BR_RING_OWNED);
690 		/* clear the flags bits from cons */
691 		br->br_cons &= ~BR_RING_FLAGS_MASK;
692 		br->br_owner = curthread;
693 		if (br->br_domain == PCPU_GET(domain))
694 			sched_pin();
695 	}
696     /*
697 	 * we became owner by way of the contested abdicate clear pending
698 	 */
699 	if (pending)
700 		atomic_clear_rel_32(&br->br_prod_head, BR_RING_PENDING);
701 
702 	critical_exit();
703 	counter_u64_add(br->br_enqueues, 1);
704 
705 	/* we're the owner - our buffers were enqueued */
706 	if (rc == EOWNED) {
707 		br_state state = buf_ring_sc_drain_locked(br, budget);
708 		if (br->br_domain == PCPU_GET(domain))
709 			sched_unpin();
710 		if (buf_ring_sc_unlock(br, state) == 0 && state == BR_ABDICATED) {
711 			DBG_COUNTER_INC(deferred);
712 			br->br_deferred(br, br->br_sc); /* consumer's re-drive mechanism */
713 		}
714 	}
715 #if 0
716 	else if (BR_STALLED(br)) {
717 		/* buffer enqueued - but we can't do any work */
718 		return (ESTALLED);
719 	}
720 #endif
721 	return (0);
722 }
723 
724 /*
725  * populate ents with up to count values from the ring
726  * and return the number of entries
727  */
728 int
buf_ring_sc_peek(struct buf_ring_sc * br,void * ents[],uint16_t count)729 buf_ring_sc_peek(struct buf_ring_sc *br, void *ents[], uint16_t count)
730 {
731 	uint32_t cons;
732 	uint32_t prod_tail;
733 	int i, inuse, prod_avail;
734 
735 	KASSERT(count > 0, ("peeking for zero entries"));
736 	KASSERT((br->br_prod_head & BR_RING_OWNED) == BR_RING_OWNED, ("peeking without lock being held"));
737 	MPASS(br->br_owner == curthread);
738 	/*
739 	 * for correctness prod_tail must be read before ring[cons]
740 	 */
741 	cons = BR_CONS_IDX(br);
742 	prod_tail = ORDERED_LOAD_32(&br->br_prod_tail);
743 	if ((inuse = brsc_get_inuse(br, cons, prod_tail, BR_GEN(br))) == 0)
744 		return (0);
745 
746 	prod_avail = min(inuse, count);
747 	for (i = 0; i < prod_avail; i++) {
748 		DBG_COUNTER_INC(nq_entry_gets);
749 		ents[i] = brsc_entry_get(br, BR_INDEX(br, cons + i));
750 		MPASS(ents[i] != NULL);
751 	}
752 	return (prod_avail);
753 }
754 
755 /*
756  * Used to return a buffer (most likely already there)
757  * to the top od the ring. The caller should *not*
758  * have used any dequeue to pull it out of the ring
759  * but instead should have used the peek() function.
760  * This is normally used where the transmit queue
761  * of a driver is full, and an mubf must be returned.
762  * Most likely whats in the ring-buffer is what
763  * is being put back (since it was not removed), but
764  * sometimes the lower transmit function may have
765  * done a pullup or other function that will have
766  * changed it. As an optimization we always put it
767  * back (since jhb says the store is probably cheaper),
768  * if we have to do a multi-queue version we will need
769  * the compare and an atomic.
770  *
771  */
772 void
buf_ring_sc_putback(struct buf_ring_sc * br,void * new,int idx)773 buf_ring_sc_putback(struct buf_ring_sc *br, void *new, int idx)
774 {
775 	KASSERT(BR_CONS_IDX(br) != br->br_prod_tail,
776 			("Buf-Ring has none in putback")) ;
777 	brsc_entry_set(br, BR_CONS_IDX(br) + idx, new);
778 }
779 
780 /*
781  * @count: the number of entries by which to advance the consumer index
782  *
783  */
784 static void
buf_ring_sc_advance(struct buf_ring_sc * br,int count)785 buf_ring_sc_advance(struct buf_ring_sc *br, int count)
786 {
787 	uint32_t cons, cons_next;
788 	uint32_t prod_tail;
789 	int i;
790 
791 	KASSERT(count > 0, ("invalid advance count"));
792 	MPASS(br->br_owner == curthread);
793 
794 	DBG_COUNTER_INC(nq_entry_advances);
795 
796 	cons = BR_CONS_IDX(br);
797 	prod_tail = br->br_prod_tail;
798 	cons_next = BR_INDEX(br, cons + count);
799 
800 	/*
801 	 * Storing NULL here serves two purposes:
802 	 * 1) it assures that the load of ring[cons] has completed
803 	 *    (only the most perverted architecture or compiler would
804 	 *    consider re-ordering a = *x; *x = b)
805 	 * 2) it allows us to enforce global ordering of the cons
806 	 *    update with an atomic_store_rel_32
807 	 */
808 	for (i = 0; i < count; i++)
809 		brsc_entry_set(br, BR_INDEX(br, cons + i), NULL);
810 
811 	if (cons_next < cons) {
812 		MPASS(cons_next <= prod_tail);
813 		brsc_gen_clear(br);
814 	}
815 	atomic_store_rel_32(&br->br_cons, cons_next);
816 
817 }
818 
819 /*
820  * mark the ring as being abdicated
821  */
822 void
buf_ring_sc_abdicate(struct buf_ring_sc * br)823 buf_ring_sc_abdicate(struct buf_ring_sc *br)
824 {
825 	uint32_t cons_next;
826 
827 	cons_next = br->br_cons | BR_RING_ABDICATING;
828 	counter_u64_add(br->br_abdications, 1);
829 	critical_enter();
830 
831 	atomic_store_rel_32(&br->br_cons, cons_next);
832 }
833 
834 int
buf_ring_sc_count(struct buf_ring_sc * br)835 buf_ring_sc_count(struct buf_ring_sc *br)
836 {
837 	/*  br_cons and br_prod_tail may be stale but the consumer
838 	 * understands that this is only a point in time snapshot
839 	 */
840 
841 	return (brsc_get_inuse(br, BR_INDEX(br, br->br_cons), br->br_prod_tail, BR_GEN(br)));
842 }
843 
844 int
buf_ring_sc_empty(struct buf_ring_sc * br)845 buf_ring_sc_empty(struct buf_ring_sc *br)
846 {
847 	/*  br_prod_tail may be stale but the consumer understands that this is
848 	*  only a point in time snapshot
849 	*/
850 
851 	return ((BR_GEN(br) == 0) && BR_INDEX(br, br->br_cons) == br->br_prod_tail);
852 }
853 
854 int
buf_ring_sc_full(struct buf_ring_sc * br)855 buf_ring_sc_full(struct buf_ring_sc *br)
856 {
857 	/* br_cons may be stale but the caller understands that this is
858 	* only a point in time snapshot
859 	*/
860 	return (BR_GEN(br) && (BR_INDEX(br, br->br_cons) == br->br_prod_tail));
861 }
862 
863 /*
864  * currently being used only for flushing all buffers
865  */
866 static void
buf_ring_sc_lock(struct buf_ring_sc * br)867 buf_ring_sc_lock(struct buf_ring_sc *br)
868 {
869 	uint32_t value;
870 
871 	do {
872 		while ((value = br->br_prod_head) & BR_RING_PENDING)
873 			cpu_spinwait();
874 	} while (!atomic_cmpset_acq_32(&br->br_prod_head, value, value | BR_RING_PENDING));
875 	do {
876 		while ((value = br->br_prod_head) & BR_RING_OWNED)
877 			cpu_spinwait();
878 	} while (!atomic_cmpset_acq_32(&br->br_prod_head, value, value | BR_RING_OWNED));
879 	br->br_owner = curthread;
880 	if (br->br_cons & BR_RING_IDLE)
881 		counter_u64_add(br->br_starts, 1);
882 	else if (br->br_cons & BR_RING_STALLED)
883 		counter_u64_add(br->br_restarts, 1);
884 	br->br_cons &= ~(BR_RING_IDLE|BR_RING_ABDICATING|BR_RING_STALLED);
885 	atomic_clear_rel_32(&br->br_prod_head, BR_RING_PENDING);
886 }
887 
888 
889 static int
buf_ring_sc_trylock(struct buf_ring_sc * br)890 buf_ring_sc_trylock(struct buf_ring_sc *br)
891 {
892 	uint32_t value;
893 
894 	do {
895 		value = br->br_prod_head;
896 		if (value & (BR_RING_OWNED|BR_RING_PENDING))
897 			return (0);
898 	} while (!atomic_cmpset_acq_32(&br->br_prod_head, value, value | BR_RING_OWNED | BR_RING_PENDING));
899 
900 	MPASS(br->br_owner == NULL);
901 	MPASS((br->br_prod_head & (BR_RING_OWNED|BR_RING_PENDING)) == (BR_RING_OWNED|BR_RING_PENDING));
902 	br->br_cons &= ~(BR_RING_IDLE|BR_RING_ABDICATING|BR_RING_STALLED);
903 	atomic_clear_rel_32(&br->br_prod_head, BR_RING_PENDING);
904 	br->br_owner = curthread;
905 	if (br->br_cons & BR_RING_IDLE)
906 		counter_u64_add(br->br_starts, 1);
907 	else if (br->br_cons & BR_RING_STALLED)
908 		counter_u64_add(br->br_restarts, 1);
909 
910 	return (1);
911 }
912 
913 static int
buf_ring_sc_unlock(struct buf_ring_sc * br,br_state reason)914 buf_ring_sc_unlock(struct buf_ring_sc *br, br_state reason)
915 {
916 	uint32_t prod_head, cons_next;
917 	int pending;
918 
919 	KASSERT(br->br_prod_head & BR_RING_OWNED, ("unlocking unowned ring"));
920 	/*
921 	 * we treat IDLE the same as ABDICATE to avoid a race
922 	 * with enqueue - they only differ for purposes of stats
923 	 * keeping
924 	 */
925 	if (reason == BR_IDLE) {
926 		cons_next = br->br_cons | BR_RING_IDLE;
927 		critical_enter();
928 		atomic_store_rel_32(&br->br_cons, cons_next);
929 	} else if ((reason == BR_ABDICATED) &&
930 			   (br->br_cons & BR_RING_ABDICATING) == 0) {
931 		cons_next = br->br_cons | BR_RING_ABDICATING;
932 		counter_u64_add(br->br_abdications, 1);
933 		critical_enter();
934 		atomic_store_rel_32(&br->br_cons, cons_next);
935 	} else if (reason == BR_STALLED) {
936 		cons_next = br->br_cons | BR_RING_STALLED;
937 		counter_u64_add(br->br_stalls, 1);
938 		critical_enter();
939 		atomic_store_rel_32(&br->br_cons, cons_next);
940 	} else
941 		panic("invalid unlock state");
942 	br->br_owner = NULL;
943 	do {
944 		prod_head = br->br_prod_head;
945 		pending = !!(prod_head & BR_RING_PENDING);
946 	} while (!atomic_cmpset_rel_32(&br->br_prod_head, prod_head, prod_head & ~BR_RING_OWNED));
947 	critical_exit();
948 	return (pending);
949 }
950