1 /*        $NetBSD: threadpool.c,v 1.6 2024/02/02 21:52:23 andvar Exp $          */
2 
3 /*-
4  * Copyright (c) 2018 The NetBSD Foundation, Inc.
5  * All rights reserved.
6  *
7  * This code is derived from software contributed to The NetBSD Foundation
8  * by Jason R. Thorpe.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND
20  * CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES,
21  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
22  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
23  * IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY
24  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
26  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
28  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
30  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #include <sys/cdefs.h>
34 #if !defined(lint)
35 __RCSID("$NetBSD: threadpool.c,v 1.6 2024/02/02 21:52:23 andvar Exp $");
36 #endif /* !lint */
37 
38 #include <sys/param.h>
39 #include <sys/condvar.h>
40 #include <sys/kernel.h>
41 #include <sys/kmem.h>
42 #include <sys/mutex.h>
43 #include <sys/threadpool.h>
44 
45 #include "kernspace.h"
46 
47 void
rumptest_threadpool_unbound_lifecycle(void)48 rumptest_threadpool_unbound_lifecycle(void)
49 {
50           struct threadpool *pool0, *pool1, *pool2;
51           int error;
52 
53           error = threadpool_get(&pool0, PRI_NONE);
54           KASSERT(error == 0);
55 
56           error = threadpool_get(&pool1, PRI_NONE);
57           KASSERT(error == 0);
58 
59           KASSERT(pool0 == pool1);
60 
61           error = threadpool_get(&pool2, PRI_KERNEL_RT);
62           KASSERT(error == 0);
63 
64           KASSERT(pool0 != pool2);
65 
66           threadpool_put(pool0, PRI_NONE);
67           threadpool_put(pool1, PRI_NONE);
68           threadpool_put(pool2, PRI_KERNEL_RT);
69 }
70 
71 void
rumptest_threadpool_percpu_lifecycle(void)72 rumptest_threadpool_percpu_lifecycle(void)
73 {
74           struct threadpool_percpu *pcpu0, *pcpu1, *pcpu2;
75           int error;
76 
77           error = threadpool_percpu_get(&pcpu0, PRI_NONE);
78           KASSERT(error == 0);
79 
80           error = threadpool_percpu_get(&pcpu1, PRI_NONE);
81           KASSERT(error == 0);
82 
83           KASSERT(pcpu0 == pcpu1);
84 
85           error = threadpool_percpu_get(&pcpu2, PRI_KERNEL_RT);
86           KASSERT(error == 0);
87 
88           KASSERT(pcpu0 != pcpu2);
89 
90           threadpool_percpu_put(pcpu0, PRI_NONE);
91           threadpool_percpu_put(pcpu1, PRI_NONE);
92           threadpool_percpu_put(pcpu2, PRI_KERNEL_RT);
93 }
94 
95 struct test_job_data {
96           kmutex_t mutex;
97           kcondvar_t cond;
98           unsigned int count;
99           struct threadpool_job job;
100 };
101 
102 #define   FINAL_COUNT         12345
103 
104 static void
test_job_func_schedule(struct threadpool_job * job)105 test_job_func_schedule(struct threadpool_job *job)
106 {
107           struct test_job_data *data =
108               container_of(job, struct test_job_data, job);
109 
110           mutex_enter(&data->mutex);
111           KASSERT(data->count != FINAL_COUNT);
112           data->count++;
113           cv_broadcast(&data->cond);
114           threadpool_job_done(job);
115           mutex_exit(&data->mutex);
116 }
117 
118 static void
test_job_func_cancel(struct threadpool_job * job)119 test_job_func_cancel(struct threadpool_job *job)
120 {
121           struct test_job_data *data =
122               container_of(job, struct test_job_data, job);
123 
124           mutex_enter(&data->mutex);
125           if (data->count == 0) {
126                     data->count = 1;
127                     cv_broadcast(&data->cond);
128           }
129           while (data->count != FINAL_COUNT - 1)
130                     cv_wait(&data->cond, &data->mutex);
131           data->count = FINAL_COUNT;
132           cv_broadcast(&data->cond);
133           threadpool_job_done(job);
134           mutex_exit(&data->mutex);
135 }
136 
137 static void
init_test_job_data(struct test_job_data * data,threadpool_job_fn_t fn)138 init_test_job_data(struct test_job_data *data, threadpool_job_fn_t fn)
139 {
140           mutex_init(&data->mutex, MUTEX_DEFAULT, IPL_NONE);
141           cv_init(&data->cond, "testjob");
142           threadpool_job_init(&data->job, fn, &data->mutex, "testjob");
143           data->count = 0;
144 }
145 
146 static void
fini_test_job_data(struct test_job_data * data)147 fini_test_job_data(struct test_job_data *data)
148 {
149           threadpool_job_destroy(&data->job);
150           cv_destroy(&data->cond);
151           mutex_destroy(&data->mutex);
152 }
153 
154 void
rumptest_threadpool_unbound_schedule(void)155 rumptest_threadpool_unbound_schedule(void)
156 {
157           struct test_job_data data;
158           struct threadpool *pool;
159           int error;
160 
161           error = threadpool_get(&pool, PRI_NONE);
162           KASSERT(error == 0);
163 
164           init_test_job_data(&data, test_job_func_schedule);
165 
166           mutex_enter(&data.mutex);
167           while (data.count != FINAL_COUNT) {
168                     threadpool_schedule_job(pool, &data.job);
169                     error = cv_timedwait(&data.cond, &data.mutex, hz * 2);
170                     KASSERT(error != EWOULDBLOCK);
171           }
172           mutex_exit(&data.mutex);
173 
174           fini_test_job_data(&data);
175 
176           threadpool_put(pool, PRI_NONE);
177 }
178 
179 void
rumptest_threadpool_percpu_schedule(void)180 rumptest_threadpool_percpu_schedule(void)
181 {
182           struct test_job_data data;
183           struct threadpool_percpu *pcpu;
184           struct threadpool *pool;
185           int error;
186 
187           error = threadpool_percpu_get(&pcpu, PRI_NONE);
188           KASSERT(error == 0);
189 
190           pool = threadpool_percpu_ref(pcpu);
191 
192           init_test_job_data(&data, test_job_func_schedule);
193 
194           mutex_enter(&data.mutex);
195           while (data.count != FINAL_COUNT) {
196                     threadpool_schedule_job(pool, &data.job);
197                     error = cv_timedwait(&data.cond, &data.mutex, hz * 2);
198                     KASSERT(error != EWOULDBLOCK);
199           }
200           mutex_exit(&data.mutex);
201 
202           fini_test_job_data(&data);
203 
204           threadpool_percpu_put(pcpu, PRI_NONE);
205 }
206 
207 void
rumptest_threadpool_job_cancel(void)208 rumptest_threadpool_job_cancel(void)
209 {
210           struct test_job_data data;
211           struct threadpool *pool;
212           int error;
213           bool rv;
214 
215           error = threadpool_get(&pool, PRI_NONE);
216           KASSERT(error == 0);
217 
218           init_test_job_data(&data, test_job_func_cancel);
219 
220           mutex_enter(&data.mutex);
221           threadpool_schedule_job(pool, &data.job);
222           while (data.count == 0)
223                     cv_wait(&data.cond, &data.mutex);
224           KASSERT(data.count == 1);
225 
226           /* Job is already running (and is not finished); this should fail. */
227           rv = threadpool_cancel_job_async(pool, &data.job);
228           KASSERT(rv == false);
229 
230           data.count = FINAL_COUNT - 1;
231           cv_broadcast(&data.cond);
232 
233           /* Now wait for the job to finish. */
234           threadpool_cancel_job(pool, &data.job);
235           KASSERT(data.count == FINAL_COUNT);
236           mutex_exit(&data.mutex);
237 
238           fini_test_job_data(&data);
239 
240           threadpool_put(pool, PRI_NONE);
241 }
242 
243 void
rumptest_threadpool_job_cancelthrash(void)244 rumptest_threadpool_job_cancelthrash(void)
245 {
246           struct test_job_data data;
247           struct threadpool *pool;
248           int i, error;
249 
250           error = threadpool_get(&pool, PRI_NONE);
251           KASSERT(error == 0);
252 
253           init_test_job_data(&data, test_job_func_cancel);
254 
255           mutex_enter(&data.mutex);
256           for (i = 0; i < 10000; i++) {
257                     threadpool_schedule_job(pool, &data.job);
258                     if ((i % 3) == 0) {
259                               mutex_exit(&data.mutex);
260                               mutex_enter(&data.mutex);
261                     }
262                     /*
263                      * If the job managed to start, ensure that its exit
264                      * condition is met so that we don't wait forever
265                      * for the job to finish.
266                      */
267                     data.count = FINAL_COUNT - 1;
268                     cv_broadcast(&data.cond);
269 
270                     threadpool_cancel_job(pool, &data.job);
271 
272                     /*
273                      * After cancellation, either the job didn't start
274                      * (data.count == FINAL_COUNT - 1, per above) or
275                      * it finished (data.count == FINAL_COUNT).
276                      */
277                     KASSERT(data.count == (FINAL_COUNT - 1) ||
278                         data.count == FINAL_COUNT);
279 
280                     /* Reset for the loop. */
281                     data.count = 0;
282           }
283           mutex_exit(&data.mutex);
284 
285           fini_test_job_data(&data);
286 
287           threadpool_put(pool, PRI_NONE);
288 }
289