xref: /dragonfly/usr.bin/dsynth/bulk.c (revision 33c3dcc355044cebdc1e32dd30af4c6307dee5b7)
1 /*
2  * Copyright (c) 2019 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * This code uses concepts and configuration based on 'synth', by
8  * John R. Marino <draco@marino.st>, which was written in ada.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  * 1. Redistributions of source code must retain the above copyright
15  *    notice, this list of conditions and the following disclaimer.
16  * 2. Redistributions in binary form must reproduce the above copyright
17  *    notice, this list of conditions and the following disclaimer in
18  *    the documentation and/or other materials provided with the
19  *    distribution.
20  * 3. Neither the name of The DragonFly Project nor the names of its
21  *    contributors may be used to endorse or promote products derived
22  *    from this software without specific, prior written permission.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
25  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
26  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
27  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
28  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
29  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
30  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
31  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
32  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
33  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
34  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35  * SUCH DAMAGE.
36  */
37 #include "dsynth.h"
38 
39 typedef struct job {
40           pthread_t td;
41           pthread_cond_t cond;
42           bulk_t    *active;
43           int terminate : 1;
44 } job_t;
45 
46 /*
47  * Most of these globals are locked with BulkMutex
48  */
49 static int BulkScanJob;
50 static int BulkCurJobs;
51 static int BulkMaxJobs;
52 static job_t JobsAry[MAXBULK];
53 static void (*BulkFunc)(bulk_t *bulk);
54 static bulk_t *BulkSubmit;
55 static bulk_t **BulkSubmitTail = &BulkSubmit;
56 static bulk_t *BulkResponse;
57 static bulk_t **BulkResponseTail = &BulkResponse;
58 static pthread_cond_t BulkResponseCond;
59 static pthread_mutex_t BulkMutex;
60 
61 static void bulkstart(void);
62 #if 0
63 static int readall(int fd, void *buf, size_t bytes);
64 static int writeall(int fd, const void *buf, size_t bytes);
65 #endif
66 static void *bulkthread(void *arg);
67 
68 /*
69  * Initialize for bulk scan operations.  Always paired with donebulk()
70  */
71 void
initbulk(void (* func)(bulk_t * bulk),int jobs)72 initbulk(void (*func)(bulk_t *bulk), int jobs)
73 {
74           int i;
75 
76           if (jobs > MAXBULK)
77                     jobs = MAXBULK;
78 
79           ddassert(BulkSubmit == NULL);
80           BulkCurJobs = 0;
81           BulkMaxJobs = jobs;
82           BulkFunc = func;
83           BulkScanJob = 0;
84 
85           addbuildenv("__MAKE_CONF", "/dev/null",
86                         BENV_ENVIRONMENT | BENV_PKGLIST);
87 
88           /*
89            * CCache is a horrible unreliable hack but... leave the
90            * mechanism in-place in case someone has a death wish.
91            */
92           if (UseCCache) {
93                     addbuildenv("WITH_CCACHE_BUILD", "yes", BENV_MAKECONF);
94                     addbuildenv("CCACHE_DIR", CCachePath, BENV_MAKECONF);
95           }
96 
97           pthread_mutex_init(&BulkMutex, NULL);
98           pthread_cond_init(&BulkResponseCond, NULL);
99 
100           pthread_mutex_lock(&BulkMutex);
101           for (i = 0; i < jobs; ++i) {
102                     pthread_cond_init(&JobsAry[i].cond, NULL);
103                     pthread_create(&JobsAry[i].td, NULL, bulkthread, &JobsAry[i]);
104           }
105           pthread_mutex_unlock(&BulkMutex);
106 }
107 
108 void
donebulk(void)109 donebulk(void)
110 {
111           bulk_t *bulk;
112           int i;
113 
114           pthread_mutex_lock(&BulkMutex);
115           while ((bulk = BulkSubmit) != NULL) {
116                     BulkSubmit = bulk->next;
117                     freebulk(bulk);
118           }
119           BulkSubmitTail = &BulkSubmit;
120 
121           for (i = 0; i < BulkMaxJobs; ++i) {
122                     JobsAry[i].terminate = 1;
123                     pthread_cond_signal(&JobsAry[i].cond);
124           }
125           pthread_mutex_unlock(&BulkMutex);
126           for (i = 0; i < BulkMaxJobs; ++i) {
127                     pthread_join(JobsAry[i].td, NULL);
128                     pthread_cond_destroy(&JobsAry[i].cond);
129                     if (JobsAry[i].active) {
130                               freebulk(JobsAry[i].active);
131                               JobsAry[i].active = NULL;
132                               pthread_mutex_lock(&BulkMutex);
133                               --BulkCurJobs;
134                               pthread_mutex_unlock(&BulkMutex);
135                     }
136                     JobsAry[i].terminate = 0;
137           }
138           ddassert(BulkCurJobs == 0);
139 
140           while ((bulk = BulkResponse) != NULL) {
141                     BulkResponse = bulk->next;
142                     freebulk(bulk);
143           }
144           BulkResponseTail = &BulkResponse;
145 
146           BulkFunc = NULL;
147 
148           bzero(JobsAry, sizeof(JobsAry));
149 
150           if (UseCCache) {
151                     delbuildenv("WITH_CCACHE_BUILD");
152                     delbuildenv("CCACHE_DIR");
153           }
154           delbuildenv("__MAKE_CONF");
155 }
156 
157 void
queuebulk(const char * s1,const char * s2,const char * s3,const char * s4)158 queuebulk(const char *s1, const char *s2, const char *s3, const char *s4)
159 {
160           bulk_t *bulk;
161 
162           bulk = calloc(1, sizeof(*bulk));
163           if (s1)
164                     bulk->s1 = strdup(s1);
165           if (s2)
166                     bulk->s2 = strdup(s2);
167           if (s3)
168                     bulk->s3 = strdup(s3);
169           if (s4)
170                     bulk->s4 = strdup(s4);
171           bulk->state = ONSUBMIT;
172 
173           pthread_mutex_lock(&BulkMutex);
174           *BulkSubmitTail = bulk;
175           BulkSubmitTail = &bulk->next;
176           if (BulkCurJobs < BulkMaxJobs) {
177                     pthread_mutex_unlock(&BulkMutex);
178                     bulkstart();
179           } else {
180                     pthread_mutex_unlock(&BulkMutex);
181           }
182 }
183 
184 /*
185  * Fill any idle job slots with new jobs as available.
186  */
187 static
188 void
bulkstart(void)189 bulkstart(void)
190 {
191           bulk_t *bulk;
192           int i;
193 
194           pthread_mutex_lock(&BulkMutex);
195           while ((bulk = BulkSubmit) != NULL && BulkCurJobs < BulkMaxJobs) {
196                     i = BulkScanJob + 1;
197                     for (;;) {
198                               i = i % BulkMaxJobs;
199                               if (JobsAry[i].active == NULL)
200                                         break;
201                               ++i;
202                     }
203                     BulkScanJob = i;
204                     BulkSubmit = bulk->next;
205                     if (BulkSubmit == NULL)
206                               BulkSubmitTail = &BulkSubmit;
207 
208                     bulk->state = ONRUN;
209                     JobsAry[i].active = bulk;
210                     pthread_cond_signal(&JobsAry[i].cond);
211                     ++BulkCurJobs;
212           }
213           pthread_mutex_unlock(&BulkMutex);
214 }
215 
216 /*
217  * Retrieve completed job or job with activity
218  */
219 bulk_t *
getbulk(void)220 getbulk(void)
221 {
222           bulk_t *bulk;
223 
224           pthread_mutex_lock(&BulkMutex);
225           while (BulkCurJobs && BulkResponse == NULL) {
226                     pthread_cond_wait(&BulkResponseCond, &BulkMutex);
227           }
228           if (BulkResponse) {
229                     bulk = BulkResponse;
230                     ddassert(bulk->state == ONRESPONSE);
231                     BulkResponse = bulk->next;
232                     if (BulkResponse == NULL)
233                               BulkResponseTail = &BulkResponse;
234                     bulk->state = UNLISTED;
235           } else {
236                     bulk = NULL;
237           }
238           pthread_mutex_unlock(&BulkMutex);
239           bulkstart();
240 
241           return bulk;
242 }
243 
244 void
freebulk(bulk_t * bulk)245 freebulk(bulk_t *bulk)
246 {
247           ddassert(bulk->state == UNLISTED);
248 
249           if (bulk->s1) {
250                     free(bulk->s1);
251                     bulk->s1 = NULL;
252           }
253           if (bulk->s2) {
254                     free(bulk->s2);
255                     bulk->s2 = NULL;
256           }
257           if (bulk->s3) {
258                     free(bulk->s3);
259                     bulk->s3 = NULL;
260           }
261           if (bulk->s4) {
262                     free(bulk->s4);
263                     bulk->s4 = NULL;
264           }
265           if (bulk->r1) {
266                     free(bulk->r1);
267                     bulk->r1 = NULL;
268           }
269           if (bulk->r2) {
270                     free(bulk->r2);
271                     bulk->r2 = NULL;
272           }
273           if (bulk->r3) {
274                     free(bulk->r3);
275                     bulk->r3 = NULL;
276           }
277           if (bulk->r4) {
278                     free(bulk->r4);
279                     bulk->r4 = NULL;
280           }
281           free(bulk);
282 }
283 
284 #if 0
285 
286 /*
287  * Returns non-zero if unable to read specified number of bytes
288  */
289 static
290 int
291 readall(int fd, void *buf, size_t bytes)
292 {
293           ssize_t r;
294 
295           for (;;) {
296                     r = read(fd, buf, bytes);
297                     if (r == (ssize_t)bytes)
298                               break;
299                     if (r > 0) {
300                               buf = (char *)buf + r;
301                               bytes -= r;
302                               continue;
303                     }
304                     if (r < 0 && errno == EINTR)
305                               continue;
306                     return 1;
307           }
308           return 0;
309 }
310 
311 static
312 int
313 writeall(int fd, const void *buf, size_t bytes)
314 {
315           ssize_t r;
316 
317           for (;;) {
318                     r = write(fd, buf, bytes);
319                     if (r == (ssize_t)bytes)
320                               break;
321                     if (r > 0) {
322                               buf = (const char *)buf + r;
323                               bytes -= r;
324                               continue;
325                     }
326                     if (r < 0 && errno == EINTR)
327                               continue;
328                     return 1;
329           }
330           return 0;
331 }
332 
333 #endif
334 
335 static void *
bulkthread(void * arg)336 bulkthread(void *arg)
337 {
338           job_t *job = arg;
339           bulk_t *bulk;
340 
341           pthread_mutex_lock(&BulkMutex);
342           for (;;) {
343                     if (job->terminate)
344                               break;
345                     if (job->active == NULL)
346                               pthread_cond_wait(&job->cond, &BulkMutex);
347                     bulk = job->active;
348                     if (bulk) {
349                               bulk->state = ISRUNNING;
350 
351                               pthread_mutex_unlock(&BulkMutex);
352                               BulkFunc(bulk);
353                               pthread_mutex_lock(&BulkMutex);
354 
355                               bulk->state = ONRESPONSE;
356                               bulk->next = NULL;
357                               *BulkResponseTail = bulk;
358                               BulkResponseTail = &bulk->next;
359                               --BulkCurJobs;
360                               pthread_cond_signal(&BulkResponseCond);
361                     }
362 
363                     /*
364                      * Optimization - automatically fetch the next job
365                      */
366                     if ((bulk = BulkSubmit) != NULL && job->terminate == 0) {
367                               BulkSubmit = bulk->next;
368                               if (BulkSubmit == NULL)
369                                         BulkSubmitTail = &BulkSubmit;
370                               bulk->state = ONRUN;
371                               job->active = bulk;
372                               ++BulkCurJobs;
373                     } else {
374                               job->active = NULL;
375                     }
376           }
377           pthread_mutex_unlock(&BulkMutex);
378 
379           return NULL;
380 }
381