1 /*
2 * CDDL HEADER START
3 *
4 * This file and its contents are supplied under the terms of the
5 * Common Development and Distribution License ("CDDL"), version 1.0.
6 * You may only use this file in accordance with the terms of version
7 * 1.0 of the CDDL.
8 *
9 * A full copy of the text of the CDDL should have accompanied this
10 * source. A copy of the CDDL is also available via the Internet at
11 * http://www.illumos.org/license/CDDL.
12 *
13 * CDDL HEADER END
14 */
15 /*
16 * Copyright (c) 2014, 2018 by Delphix. All rights reserved.
17 */
18
19 #include <sys/bqueue.h>
20 #include <sys/zfs_context.h>
21
22 static inline bqueue_node_t *
obj2node(bqueue_t * q,void * data)23 obj2node(bqueue_t *q, void *data)
24 {
25 return ((bqueue_node_t *)((char *)data + q->bq_node_offset));
26 }
27
28 /*
29 * Initialize a blocking queue The maximum capacity of the queue is set to
30 * size. Types that are stored in a bqueue must contain a bqueue_node_t,
31 * and node_offset must be its offset from the start of the struct.
32 * fill_fraction is a performance tuning value; when the queue is full, any
33 * threads attempting to enqueue records will block. They will block until
34 * they're signaled, which will occur when the queue is at least 1/fill_fraction
35 * empty. Similar behavior occurs on dequeue; if the queue is empty, threads
36 * block. They will be signalled when the queue has 1/fill_fraction full, or
37 * when bqueue_flush is called. As a result, you must call bqueue_flush when
38 * you enqueue your final record on a thread, in case the dequeueing threads are
39 * currently blocked and that enqueue does not cause them to be awoken.
40 * Alternatively, this behavior can be disabled (causing signaling to happen
41 * immediately) by setting fill_fraction to any value larger than size.
42 * Return 0 on success, or -1 on failure.
43 */
44 int
bqueue_init(bqueue_t * q,uint_t fill_fraction,size_t size,size_t node_offset)45 bqueue_init(bqueue_t *q, uint_t fill_fraction, size_t size, size_t node_offset)
46 {
47 if (fill_fraction == 0) {
48 return (-1);
49 }
50 list_create(&q->bq_list, node_offset + sizeof (bqueue_node_t),
51 node_offset + offsetof(bqueue_node_t, bqn_node));
52 cv_init(&q->bq_add_cv, NULL, CV_DEFAULT, NULL);
53 cv_init(&q->bq_pop_cv, NULL, CV_DEFAULT, NULL);
54 mutex_init(&q->bq_lock, NULL, MUTEX_DEFAULT, NULL);
55 q->bq_node_offset = node_offset;
56 q->bq_size = 0;
57 q->bq_maxsize = size;
58 q->bq_fill_fraction = fill_fraction;
59 return (0);
60 }
61
62 /*
63 * Destroy a blocking queue. This function asserts that there are no
64 * elements in the queue, and no one is blocked on the condition
65 * variables.
66 */
67 void
bqueue_destroy(bqueue_t * q)68 bqueue_destroy(bqueue_t *q)
69 {
70 mutex_enter(&q->bq_lock);
71 ASSERT0(q->bq_size);
72 cv_destroy(&q->bq_add_cv);
73 cv_destroy(&q->bq_pop_cv);
74 list_destroy(&q->bq_list);
75 mutex_exit(&q->bq_lock);
76 mutex_destroy(&q->bq_lock);
77 }
78
79 static void
bqueue_enqueue_impl(bqueue_t * q,void * data,size_t item_size,boolean_t flush)80 bqueue_enqueue_impl(bqueue_t *q, void *data, size_t item_size, boolean_t flush)
81 {
82 ASSERT3U(item_size, >, 0);
83 ASSERT3U(item_size, <=, q->bq_maxsize);
84 mutex_enter(&q->bq_lock);
85 obj2node(q, data)->bqn_size = item_size;
86 while (q->bq_size && q->bq_size + item_size > q->bq_maxsize) {
87 /*
88 * Wake up bqueue_dequeue() thread if already sleeping in order
89 * to prevent the deadlock condition
90 */
91 cv_signal(&q->bq_pop_cv);
92 cv_wait_sig(&q->bq_add_cv, &q->bq_lock);
93 }
94 q->bq_size += item_size;
95 list_insert_tail(&q->bq_list, data);
96 if (flush)
97 cv_broadcast(&q->bq_pop_cv);
98 else if (q->bq_size >= q->bq_maxsize / q->bq_fill_fraction)
99 cv_signal(&q->bq_pop_cv);
100 mutex_exit(&q->bq_lock);
101 }
102
103 /*
104 * Add data to q, consuming size units of capacity. If there is insufficient
105 * capacity to consume size units, block until capacity exists. Asserts size is
106 * > 0.
107 */
108 void
bqueue_enqueue(bqueue_t * q,void * data,size_t item_size)109 bqueue_enqueue(bqueue_t *q, void *data, size_t item_size)
110 {
111 bqueue_enqueue_impl(q, data, item_size, B_FALSE);
112 }
113
114 /*
115 * Enqueue an entry, and then flush the queue. This forces the popping threads
116 * to wake up, even if we're below the fill fraction. We have this in a single
117 * function, rather than having a separate call, because it prevents race
118 * conditions between the enqueuing thread and the dequeueing thread, where the
119 * enqueueing thread will wake up the dequeueing thread, that thread will
120 * destroy the condvar before the enqueuing thread is done.
121 */
122 void
bqueue_enqueue_flush(bqueue_t * q,void * data,size_t item_size)123 bqueue_enqueue_flush(bqueue_t *q, void *data, size_t item_size)
124 {
125 bqueue_enqueue_impl(q, data, item_size, B_TRUE);
126 }
127
128 /*
129 * Take the first element off of q. If there are no elements on the queue, wait
130 * until one is put there. Return the removed element.
131 */
132 void *
bqueue_dequeue(bqueue_t * q)133 bqueue_dequeue(bqueue_t *q)
134 {
135 void *ret = NULL;
136 size_t item_size;
137 mutex_enter(&q->bq_lock);
138 while (q->bq_size == 0) {
139 cv_wait_sig(&q->bq_pop_cv, &q->bq_lock);
140 }
141 ret = list_remove_head(&q->bq_list);
142 ASSERT3P(ret, !=, NULL);
143 item_size = obj2node(q, ret)->bqn_size;
144 q->bq_size -= item_size;
145 if (q->bq_size <= q->bq_maxsize - (q->bq_maxsize / q->bq_fill_fraction))
146 cv_signal(&q->bq_add_cv);
147 mutex_exit(&q->bq_lock);
148 return (ret);
149 }
150
151 /*
152 * Returns true if the space used is 0.
153 */
154 boolean_t
bqueue_empty(bqueue_t * q)155 bqueue_empty(bqueue_t *q)
156 {
157 return (q->bq_size == 0);
158 }
159