1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 /// \file       stream_encoder_mt.c
4 /// \brief      Multithreaded .xz Stream encoder
5 //
6 //  Author:     Lasse Collin
7 //
8 //  This file has been put into the public domain.
9 //  You can do whatever you want with this file.
10 //
11 ///////////////////////////////////////////////////////////////////////////////
12 
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
18 #include "outqueue.h"
19 
20 
21 /// Maximum supported block size. This makes it simpler to prevent integer
22 /// overflows if we are given unusually large block size.
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24 
25 
26 typedef enum {
27           /// Waiting for work.
28           THR_IDLE,
29 
30           /// Encoding is in progress.
31           THR_RUN,
32 
33           /// Encoding is in progress but no more input data will
34           /// be read.
35           THR_FINISH,
36 
37           /// The main thread wants the thread to stop whatever it was doing
38           /// but not exit.
39           THR_STOP,
40 
41           /// The main thread wants the thread to exit. We could use
42           /// cancellation but since there's stopped anyway, this is lazier.
43           THR_EXIT,
44 
45 } worker_state;
46 
47 typedef struct lzma_stream_coder_s lzma_stream_coder;
48 
49 typedef struct worker_thread_s worker_thread;
50 struct worker_thread_s {
51           worker_state state;
52 
53           /// Input buffer of coder->block_size bytes. The main thread will
54           /// put new input into this and update in_size accordingly. Once
55           /// no more input is coming, state will be set to THR_FINISH.
56           uint8_t *in;
57 
58           /// Amount of data available in the input buffer. This is modified
59           /// only by the main thread.
60           size_t in_size;
61 
62           /// Output buffer for this thread. This is set by the main
63           /// thread every time a new Block is started with this thread
64           /// structure.
65           lzma_outbuf *outbuf;
66 
67           /// Pointer to the main structure is needed when putting this
68           /// thread back to the stack of free threads.
69           lzma_stream_coder *coder;
70 
71           /// The allocator is set by the main thread. Since a copy of the
72           /// pointer is kept here, the application must not change the
73           /// allocator before calling lzma_end().
74           const lzma_allocator *allocator;
75 
76           /// Amount of uncompressed data that has already been compressed.
77           uint64_t progress_in;
78 
79           /// Amount of compressed data that is ready.
80           uint64_t progress_out;
81 
82           /// Block encoder
83           lzma_next_coder block_encoder;
84 
85           /// Compression options for this Block
86           lzma_block block_options;
87 
88           /// Next structure in the stack of free worker threads.
89           worker_thread *next;
90 
91           mythread_mutex mutex;
92           mythread_cond cond;
93 
94           /// The ID of this thread is used to join the thread
95           /// when it's not needed anymore.
96           mythread thread_id;
97 };
98 
99 
100 struct lzma_stream_coder_s {
101           enum {
102                     SEQ_STREAM_HEADER,
103                     SEQ_BLOCK,
104                     SEQ_INDEX,
105                     SEQ_STREAM_FOOTER,
106           } sequence;
107 
108           /// Start a new Block every block_size bytes of input unless
109           /// LZMA_FULL_FLUSH or LZMA_FULL_BARRIER is used earlier.
110           size_t block_size;
111 
112           /// The filter chain currently in use
113           lzma_filter filters[LZMA_FILTERS_MAX + 1];
114 
115 
116           /// Index to hold sizes of the Blocks
117           lzma_index *index;
118 
119           /// Index encoder
120           lzma_next_coder index_encoder;
121 
122 
123           /// Stream Flags for encoding the Stream Header and Stream Footer.
124           lzma_stream_flags stream_flags;
125 
126           /// Buffer to hold Stream Header and Stream Footer.
127           uint8_t header[LZMA_STREAM_HEADER_SIZE];
128 
129           /// Read position in header[]
130           size_t header_pos;
131 
132 
133           /// Output buffer queue for compressed data
134           lzma_outq outq;
135 
136 
137           /// Maximum wait time if cannot use all the input and cannot
138           /// fill the output buffer. This is in milliseconds.
139           uint32_t timeout;
140 
141 
142           /// Error code from a worker thread
143           lzma_ret thread_error;
144 
145           /// Array of allocated thread-specific structures
146           worker_thread *threads;
147 
148           /// Number of structures in "threads" above. This is also the
149           /// number of threads that will be created at maximum.
150           uint32_t threads_max;
151 
152           /// Number of thread structures that have been initialized, and
153           /// thus the number of worker threads actually created so far.
154           uint32_t threads_initialized;
155 
156           /// Stack of free threads. When a thread finishes, it puts itself
157           /// back into this stack. This starts as empty because threads
158           /// are created only when actually needed.
159           worker_thread *threads_free;
160 
161           /// The most recent worker thread to which the main thread writes
162           /// the new input from the application.
163           worker_thread *thr;
164 
165 
166           /// Amount of uncompressed data in Blocks that have already
167           /// been finished.
168           uint64_t progress_in;
169 
170           /// Amount of compressed data in Stream Header + Blocks that
171           /// have already been finished.
172           uint64_t progress_out;
173 
174 
175           mythread_mutex mutex;
176           mythread_cond cond;
177 };
178 
179 
180 /// Tell the main thread that something has gone wrong.
181 static void
worker_error(worker_thread * thr,lzma_ret ret)182 worker_error(worker_thread *thr, lzma_ret ret)
183 {
184           assert(ret != LZMA_OK);
185           assert(ret != LZMA_STREAM_END);
186 
187           mythread_sync(thr->coder->mutex) {
188                     if (thr->coder->thread_error == LZMA_OK)
189                               thr->coder->thread_error = ret;
190 
191                     mythread_cond_signal(&thr->coder->cond);
192           }
193 
194           return;
195 }
196 
197 
198 static worker_state
worker_encode(worker_thread * thr,worker_state state)199 worker_encode(worker_thread *thr, worker_state state)
200 {
201           assert(thr->progress_in == 0);
202           assert(thr->progress_out == 0);
203 
204           // Set the Block options.
205           thr->block_options = (lzma_block){
206                     .version = 0,
207                     .check = thr->coder->stream_flags.check,
208                     .compressed_size = thr->coder->outq.buf_size_max,
209                     .uncompressed_size = thr->coder->block_size,
210 
211                     // TODO: To allow changing the filter chain, the filters
212                     // array must be copied to each worker_thread.
213                     .filters = thr->coder->filters,
214           };
215 
216           // Calculate maximum size of the Block Header. This amount is
217           // reserved in the beginning of the buffer so that Block Header
218           // along with Compressed Size and Uncompressed Size can be
219           // written there.
220           lzma_ret ret = lzma_block_header_size(&thr->block_options);
221           if (ret != LZMA_OK) {
222                     worker_error(thr, ret);
223                     return THR_STOP;
224           }
225 
226           // Initialize the Block encoder.
227           ret = lzma_block_encoder_init(&thr->block_encoder,
228                               thr->allocator, &thr->block_options);
229           if (ret != LZMA_OK) {
230                     worker_error(thr, ret);
231                     return THR_STOP;
232           }
233 
234           size_t in_pos = 0;
235           size_t in_size = 0;
236 
237           thr->outbuf->size = thr->block_options.header_size;
238           const size_t out_size = thr->coder->outq.buf_size_max;
239 
240           do {
241                     mythread_sync(thr->mutex) {
242                               // Store in_pos and out_pos into *thr so that
243                               // an application may read them via
244                               // lzma_get_progress() to get progress information.
245                               //
246                               // NOTE: These aren't updated when the encoding
247                               // finishes. Instead, the final values are taken
248                               // later from thr->outbuf.
249                               thr->progress_in = in_pos;
250                               thr->progress_out = thr->outbuf->size;
251 
252                               while (in_size == thr->in_size
253                                                   && thr->state == THR_RUN)
254                                         mythread_cond_wait(&thr->cond, &thr->mutex);
255 
256                               state = thr->state;
257                               in_size = thr->in_size;
258                     }
259 
260                     // Return if we were asked to stop or exit.
261                     if (state >= THR_STOP)
262                               return state;
263 
264                     lzma_action action = state == THR_FINISH
265                                         ? LZMA_FINISH : LZMA_RUN;
266 
267                     // Limit the amount of input given to the Block encoder
268                     // at once. This way this thread can react fairly quickly
269                     // if the main thread wants us to stop or exit.
270                     static const size_t in_chunk_max = 16384;
271                     size_t in_limit = in_size;
272                     if (in_size - in_pos > in_chunk_max) {
273                               in_limit = in_pos + in_chunk_max;
274                               action = LZMA_RUN;
275                     }
276 
277                     ret = thr->block_encoder.code(
278                                         thr->block_encoder.coder, thr->allocator,
279                                         thr->in, &in_pos, in_limit, thr->outbuf->buf,
280                                         &thr->outbuf->size, out_size, action);
281           } while (ret == LZMA_OK && thr->outbuf->size < out_size);
282 
283           switch (ret) {
284           case LZMA_STREAM_END:
285                     assert(state == THR_FINISH);
286 
287                     // Encode the Block Header. By doing it after
288                     // the compression, we can store the Compressed Size
289                     // and Uncompressed Size fields.
290                     ret = lzma_block_header_encode(&thr->block_options,
291                                         thr->outbuf->buf);
292                     if (ret != LZMA_OK) {
293                               worker_error(thr, ret);
294                               return THR_STOP;
295                     }
296 
297                     break;
298 
299           case LZMA_OK:
300                     // The data was incompressible. Encode it using uncompressed
301                     // LZMA2 chunks.
302                     //
303                     // First wait that we have gotten all the input.
304                     mythread_sync(thr->mutex) {
305                               while (thr->state == THR_RUN)
306                                         mythread_cond_wait(&thr->cond, &thr->mutex);
307 
308                               state = thr->state;
309                               in_size = thr->in_size;
310                     }
311 
312                     if (state >= THR_STOP)
313                               return state;
314 
315                     // Do the encoding. This takes care of the Block Header too.
316                     thr->outbuf->size = 0;
317                     ret = lzma_block_uncomp_encode(&thr->block_options,
318                                         thr->in, in_size, thr->outbuf->buf,
319                                         &thr->outbuf->size, out_size);
320 
321                     // It shouldn't fail.
322                     if (ret != LZMA_OK) {
323                               worker_error(thr, LZMA_PROG_ERROR);
324                               return THR_STOP;
325                     }
326 
327                     break;
328 
329           default:
330                     worker_error(thr, ret);
331                     return THR_STOP;
332           }
333 
334           // Set the size information that will be read by the main thread
335           // to write the Index field.
336           thr->outbuf->unpadded_size
337                               = lzma_block_unpadded_size(&thr->block_options);
338           assert(thr->outbuf->unpadded_size != 0);
339           thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size;
340 
341           return THR_FINISH;
342 }
343 
344 
345 static MYTHREAD_RET_TYPE
worker_start(void * thr_ptr)346 worker_start(void *thr_ptr)
347 {
348           worker_thread *thr = thr_ptr;
349           worker_state state = THR_IDLE; // Init to silence a warning
350 
351           while (true) {
352                     // Wait for work.
353                     mythread_sync(thr->mutex) {
354                               while (true) {
355                                         // The thread is already idle so if we are
356                                         // requested to stop, just set the state.
357                                         if (thr->state == THR_STOP) {
358                                                   thr->state = THR_IDLE;
359                                                   mythread_cond_signal(&thr->cond);
360                                         }
361 
362                                         state = thr->state;
363                                         if (state != THR_IDLE)
364                                                   break;
365 
366                                         mythread_cond_wait(&thr->cond, &thr->mutex);
367                               }
368                     }
369 
370                     assert(state != THR_IDLE);
371                     assert(state != THR_STOP);
372 
373                     if (state <= THR_FINISH)
374                               state = worker_encode(thr, state);
375 
376                     if (state == THR_EXIT)
377                               break;
378 
379                     // Mark the thread as idle unless the main thread has
380                     // told us to exit. Signal is needed for the case
381                     // where the main thread is waiting for the threads to stop.
382                     mythread_sync(thr->mutex) {
383                               if (thr->state != THR_EXIT) {
384                                         thr->state = THR_IDLE;
385                                         mythread_cond_signal(&thr->cond);
386                               }
387                     }
388 
389                     mythread_sync(thr->coder->mutex) {
390                               // Mark the output buffer as finished if
391                               // no errors occurred.
392                               thr->outbuf->finished = state == THR_FINISH;
393 
394                               // Update the main progress info.
395                               thr->coder->progress_in
396                                                   += thr->outbuf->uncompressed_size;
397                               thr->coder->progress_out += thr->outbuf->size;
398                               thr->progress_in = 0;
399                               thr->progress_out = 0;
400 
401                               // Return this thread to the stack of free threads.
402                               thr->next = thr->coder->threads_free;
403                               thr->coder->threads_free = thr;
404 
405                               mythread_cond_signal(&thr->coder->cond);
406                     }
407           }
408 
409           // Exiting, free the resources.
410           mythread_mutex_destroy(&thr->mutex);
411           mythread_cond_destroy(&thr->cond);
412 
413           lzma_next_end(&thr->block_encoder, thr->allocator);
414           lzma_free(thr->in, thr->allocator);
415           return MYTHREAD_RET_VALUE;
416 }
417 
418 
419 /// Make the threads stop but not exit. Optionally wait for them to stop.
420 static void
threads_stop(lzma_stream_coder * coder,bool wait_for_threads)421 threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
422 {
423           // Tell the threads to stop.
424           for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
425                     mythread_sync(coder->threads[i].mutex) {
426                               coder->threads[i].state = THR_STOP;
427                               mythread_cond_signal(&coder->threads[i].cond);
428                     }
429           }
430 
431           if (!wait_for_threads)
432                     return;
433 
434           // Wait for the threads to settle in the idle state.
435           for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
436                     mythread_sync(coder->threads[i].mutex) {
437                               while (coder->threads[i].state != THR_IDLE)
438                                         mythread_cond_wait(&coder->threads[i].cond,
439                                                             &coder->threads[i].mutex);
440                     }
441           }
442 
443           return;
444 }
445 
446 
447 /// Stop the threads and free the resources associated with them.
448 /// Wait until the threads have exited.
449 static void
threads_end(lzma_stream_coder * coder,const lzma_allocator * allocator)450 threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
451 {
452           for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453                     mythread_sync(coder->threads[i].mutex) {
454                               coder->threads[i].state = THR_EXIT;
455                               mythread_cond_signal(&coder->threads[i].cond);
456                     }
457           }
458 
459           for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460                     int ret = mythread_join(coder->threads[i].thread_id);
461                     assert(ret == 0);
462                     (void)ret;
463           }
464 
465           lzma_free(coder->threads, allocator);
466           return;
467 }
468 
469 
470 /// Initialize a new worker_thread structure and create a new thread.
471 static lzma_ret
initialize_new_thread(lzma_stream_coder * coder,const lzma_allocator * allocator)472 initialize_new_thread(lzma_stream_coder *coder,
473                     const lzma_allocator *allocator)
474 {
475           worker_thread *thr = &coder->threads[coder->threads_initialized];
476 
477           thr->in = lzma_alloc(coder->block_size, allocator);
478           if (thr->in == NULL)
479                     return LZMA_MEM_ERROR;
480 
481           if (mythread_mutex_init(&thr->mutex))
482                     goto error_mutex;
483 
484           if (mythread_cond_init(&thr->cond))
485                     goto error_cond;
486 
487           thr->state = THR_IDLE;
488           thr->allocator = allocator;
489           thr->coder = coder;
490           thr->progress_in = 0;
491           thr->progress_out = 0;
492           thr->block_encoder = LZMA_NEXT_CODER_INIT;
493 
494           if (mythread_create(&thr->thread_id, &worker_start, thr))
495                     goto error_thread;
496 
497           ++coder->threads_initialized;
498           coder->thr = thr;
499 
500           return LZMA_OK;
501 
502 error_thread:
503           mythread_cond_destroy(&thr->cond);
504 
505 error_cond:
506           mythread_mutex_destroy(&thr->mutex);
507 
508 error_mutex:
509           lzma_free(thr->in, allocator);
510           return LZMA_MEM_ERROR;
511 }
512 
513 
514 static lzma_ret
get_thread(lzma_stream_coder * coder,const lzma_allocator * allocator)515 get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
516 {
517           // If there are no free output subqueues, there is no
518           // point to try getting a thread.
519           if (!lzma_outq_has_buf(&coder->outq))
520                     return LZMA_OK;
521 
522           // If there is a free structure on the stack, use it.
523           mythread_sync(coder->mutex) {
524                     if (coder->threads_free != NULL) {
525                               coder->thr = coder->threads_free;
526                               coder->threads_free = coder->threads_free->next;
527                     }
528           }
529 
530           if (coder->thr == NULL) {
531                     // If there are no uninitialized structures left, return.
532                     if (coder->threads_initialized == coder->threads_max)
533                               return LZMA_OK;
534 
535                     // Initialize a new thread.
536                     return_if_error(initialize_new_thread(coder, allocator));
537           }
538 
539           // Reset the parts of the thread state that have to be done
540           // in the main thread.
541           mythread_sync(coder->thr->mutex) {
542                     coder->thr->state = THR_RUN;
543                     coder->thr->in_size = 0;
544                     coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
545                     mythread_cond_signal(&coder->thr->cond);
546           }
547 
548           return LZMA_OK;
549 }
550 
551 
552 static lzma_ret
stream_encode_in(lzma_stream_coder * coder,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,lzma_action action)553 stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator,
554                     const uint8_t *restrict in, size_t *restrict in_pos,
555                     size_t in_size, lzma_action action)
556 {
557           while (*in_pos < in_size
558                               || (coder->thr != NULL && action != LZMA_RUN)) {
559                     if (coder->thr == NULL) {
560                               // Get a new thread.
561                               const lzma_ret ret = get_thread(coder, allocator);
562                               if (coder->thr == NULL)
563                                         return ret;
564                     }
565 
566                     // Copy the input data to thread's buffer.
567                     size_t thr_in_size = coder->thr->in_size;
568                     lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
569                                         &thr_in_size, coder->block_size);
570 
571                     // Tell the Block encoder to finish if
572                     //  - it has got block_size bytes of input; or
573                     //  - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
574                     //    or LZMA_FULL_BARRIER was used.
575                     //
576                     // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
577                     const bool finish = thr_in_size == coder->block_size
578                                         || (*in_pos == in_size && action != LZMA_RUN);
579 
580                     bool block_error = false;
581 
582                     mythread_sync(coder->thr->mutex) {
583                               if (coder->thr->state == THR_IDLE) {
584                                         // Something has gone wrong with the Block
585                                         // encoder. It has set coder->thread_error
586                                         // which we will read a few lines later.
587                                         block_error = true;
588                               } else {
589                                         // Tell the Block encoder its new amount
590                                         // of input and update the state if needed.
591                                         coder->thr->in_size = thr_in_size;
592 
593                                         if (finish)
594                                                   coder->thr->state = THR_FINISH;
595 
596                                         mythread_cond_signal(&coder->thr->cond);
597                               }
598                     }
599 
600                     if (block_error) {
601                               lzma_ret ret;
602 
603                               mythread_sync(coder->mutex) {
604                                         ret = coder->thread_error;
605                               }
606 
607                               return ret;
608                     }
609 
610                     if (finish)
611                               coder->thr = NULL;
612           }
613 
614           return LZMA_OK;
615 }
616 
617 
618 /// Wait until more input can be consumed, more output can be read, or
619 /// an optional timeout is reached.
620 static bool
wait_for_work(lzma_stream_coder * coder,mythread_condtime * wait_abs,bool * has_blocked,bool has_input)621 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
622                     bool *has_blocked, bool has_input)
623 {
624           if (coder->timeout != 0 && !*has_blocked) {
625                     // Every time when stream_encode_mt() is called via
626                     // lzma_code(), *has_blocked starts as false. We set it
627                     // to true here and calculate the absolute time when
628                     // we must return if there's nothing to do.
629                     //
630                     // The idea of *has_blocked is to avoid unneeded calls
631                     // to mythread_condtime_set(), which may do a syscall
632                     // depending on the operating system.
633                     *has_blocked = true;
634                     mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
635           }
636 
637           bool timed_out = false;
638 
639           mythread_sync(coder->mutex) {
640                     // There are four things that we wait. If one of them
641                     // becomes possible, we return.
642                     //  - If there is input left, we need to get a free
643                     //    worker thread and an output buffer for it.
644                     //  - Data ready to be read from the output queue.
645                     //  - A worker thread indicates an error.
646                     //  - Time out occurs.
647                     while ((!has_input || coder->threads_free == NULL
648                                                   || !lzma_outq_has_buf(&coder->outq))
649                                         && !lzma_outq_is_readable(&coder->outq)
650                                         && coder->thread_error == LZMA_OK
651                                         && !timed_out) {
652                               if (coder->timeout != 0)
653                                         timed_out = mythread_cond_timedwait(
654                                                             &coder->cond, &coder->mutex,
655                                                             wait_abs) != 0;
656                               else
657                                         mythread_cond_wait(&coder->cond,
658                                                             &coder->mutex);
659                     }
660           }
661 
662           return timed_out;
663 }
664 
665 
666 static lzma_ret
stream_encode_mt(void * coder_ptr,const lzma_allocator * allocator,const uint8_t * restrict in,size_t * restrict in_pos,size_t in_size,uint8_t * restrict out,size_t * restrict out_pos,size_t out_size,lzma_action action)667 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
668                     const uint8_t *restrict in, size_t *restrict in_pos,
669                     size_t in_size, uint8_t *restrict out,
670                     size_t *restrict out_pos, size_t out_size, lzma_action action)
671 {
672           lzma_stream_coder *coder = coder_ptr;
673 
674           switch (coder->sequence) {
675           case SEQ_STREAM_HEADER:
676                     lzma_bufcpy(coder->header, &coder->header_pos,
677                                         sizeof(coder->header),
678                                         out, out_pos, out_size);
679                     if (coder->header_pos < sizeof(coder->header))
680                               return LZMA_OK;
681 
682                     coder->header_pos = 0;
683                     coder->sequence = SEQ_BLOCK;
684 
685           // Fall through
686 
687           case SEQ_BLOCK: {
688                     // Initialized to silence warnings.
689                     lzma_vli unpadded_size = 0;
690                     lzma_vli uncompressed_size = 0;
691                     lzma_ret ret = LZMA_OK;
692 
693                     // These are for wait_for_work().
694                     bool has_blocked = false;
695                     mythread_condtime wait_abs;
696 
697                     while (true) {
698                               mythread_sync(coder->mutex) {
699                                         // Check for Block encoder errors.
700                                         ret = coder->thread_error;
701                                         if (ret != LZMA_OK) {
702                                                   assert(ret != LZMA_STREAM_END);
703                                                   break;
704                                         }
705 
706                                         // Try to read compressed data to out[].
707                                         ret = lzma_outq_read(&coder->outq,
708                                                             out, out_pos, out_size,
709                                                             &unpadded_size,
710                                                             &uncompressed_size);
711                               }
712 
713                               if (ret == LZMA_STREAM_END) {
714                                         // End of Block. Add it to the Index.
715                                         ret = lzma_index_append(coder->index,
716                                                             allocator, unpadded_size,
717                                                             uncompressed_size);
718 
719                                         // If we didn't fill the output buffer yet,
720                                         // try to read more data. Maybe the next
721                                         // outbuf has been finished already too.
722                                         if (*out_pos < out_size)
723                                                   continue;
724                               }
725 
726                               if (ret != LZMA_OK) {
727                                         // coder->thread_error was set or
728                                         // lzma_index_append() failed.
729                                         threads_stop(coder, false);
730                                         return ret;
731                               }
732 
733                               // Try to give uncompressed data to a worker thread.
734                               ret = stream_encode_in(coder, allocator,
735                                                   in, in_pos, in_size, action);
736                               if (ret != LZMA_OK) {
737                                         threads_stop(coder, false);
738                                         return ret;
739                               }
740 
741                               // See if we should wait or return.
742                               //
743                               // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
744                               if (*in_pos == in_size) {
745                                         // LZMA_RUN: More data is probably coming
746                                         // so return to let the caller fill the
747                                         // input buffer.
748                                         if (action == LZMA_RUN)
749                                                   return LZMA_OK;
750 
751                                         // LZMA_FULL_BARRIER: The same as with
752                                         // LZMA_RUN but tell the caller that the
753                                         // barrier was completed.
754                                         if (action == LZMA_FULL_BARRIER)
755                                                   return LZMA_STREAM_END;
756 
757                                         // Finishing or flushing isn't completed until
758                                         // all input data has been encoded and copied
759                                         // to the output buffer.
760                                         if (lzma_outq_is_empty(&coder->outq)) {
761                                                   // LZMA_FINISH: Continue to encode
762                                                   // the Index field.
763                                                   if (action == LZMA_FINISH)
764                                                             break;
765 
766                                                   // LZMA_FULL_FLUSH: Return to tell
767                                                   // the caller that flushing was
768                                                   // completed.
769                                                   if (action == LZMA_FULL_FLUSH)
770                                                             return LZMA_STREAM_END;
771                                         }
772                               }
773 
774                               // Return if there is no output space left.
775                               // This check must be done after testing the input
776                               // buffer, because we might want to use a different
777                               // return code.
778                               if (*out_pos == out_size)
779                                         return LZMA_OK;
780 
781                               // Neither in nor out has been used completely.
782                               // Wait until there's something we can do.
783                               if (wait_for_work(coder, &wait_abs, &has_blocked,
784                                                   *in_pos < in_size))
785                                         return LZMA_TIMED_OUT;
786                     }
787 
788                     // All Blocks have been encoded and the threads have stopped.
789                     // Prepare to encode the Index field.
790                     return_if_error(lzma_index_encoder_init(
791                                         &coder->index_encoder, allocator,
792                                         coder->index));
793                     coder->sequence = SEQ_INDEX;
794 
795                     // Update the progress info to take the Index and
796                     // Stream Footer into account. Those are very fast to encode
797                     // so in terms of progress information they can be thought
798                     // to be ready to be copied out.
799                     coder->progress_out += lzma_index_size(coder->index)
800                                         + LZMA_STREAM_HEADER_SIZE;
801           }
802 
803           // Fall through
804 
805           case SEQ_INDEX: {
806                     // Call the Index encoder. It doesn't take any input, so
807                     // those pointers can be NULL.
808                     const lzma_ret ret = coder->index_encoder.code(
809                                         coder->index_encoder.coder, allocator,
810                                         NULL, NULL, 0,
811                                         out, out_pos, out_size, LZMA_RUN);
812                     if (ret != LZMA_STREAM_END)
813                               return ret;
814 
815                     // Encode the Stream Footer into coder->buffer.
816                     coder->stream_flags.backward_size
817                                         = lzma_index_size(coder->index);
818                     if (lzma_stream_footer_encode(&coder->stream_flags,
819                                         coder->header) != LZMA_OK)
820                               return LZMA_PROG_ERROR;
821 
822                     coder->sequence = SEQ_STREAM_FOOTER;
823           }
824 
825           // Fall through
826 
827           case SEQ_STREAM_FOOTER:
828                     lzma_bufcpy(coder->header, &coder->header_pos,
829                                         sizeof(coder->header),
830                                         out, out_pos, out_size);
831                     return coder->header_pos < sizeof(coder->header)
832                                         ? LZMA_OK : LZMA_STREAM_END;
833           }
834 
835           assert(0);
836           return LZMA_PROG_ERROR;
837 }
838 
839 
840 static void
stream_encoder_mt_end(void * coder_ptr,const lzma_allocator * allocator)841 stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
842 {
843           lzma_stream_coder *coder = coder_ptr;
844 
845           // Threads must be killed before the output queue can be freed.
846           threads_end(coder, allocator);
847           lzma_outq_end(&coder->outq, allocator);
848 
849           for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
850                     lzma_free(coder->filters[i].options, allocator);
851 
852           lzma_next_end(&coder->index_encoder, allocator);
853           lzma_index_end(coder->index, allocator);
854 
855           mythread_cond_destroy(&coder->cond);
856           mythread_mutex_destroy(&coder->mutex);
857 
858           lzma_free(coder, allocator);
859           return;
860 }
861 
862 
863 /// Options handling for lzma_stream_encoder_mt_init() and
864 /// lzma_stream_encoder_mt_memusage()
865 static lzma_ret
get_options(const lzma_mt * options,lzma_options_easy * opt_easy,const lzma_filter ** filters,uint64_t * block_size,uint64_t * outbuf_size_max)866 get_options(const lzma_mt *options, lzma_options_easy *opt_easy,
867                     const lzma_filter **filters, uint64_t *block_size,
868                     uint64_t *outbuf_size_max)
869 {
870           // Validate some of the options.
871           if (options == NULL)
872                     return LZMA_PROG_ERROR;
873 
874           if (options->flags != 0 || options->threads == 0
875                               || options->threads > LZMA_THREADS_MAX)
876                     return LZMA_OPTIONS_ERROR;
877 
878           if (options->filters != NULL) {
879                     // Filter chain was given, use it as is.
880                     *filters = options->filters;
881           } else {
882                     // Use a preset.
883                     if (lzma_easy_preset(opt_easy, options->preset))
884                               return LZMA_OPTIONS_ERROR;
885 
886                     *filters = opt_easy->filters;
887           }
888 
889           // Block size
890           if (options->block_size > 0) {
891                     if (options->block_size > BLOCK_SIZE_MAX)
892                               return LZMA_OPTIONS_ERROR;
893 
894                     *block_size = options->block_size;
895           } else {
896                     // Determine the Block size from the filter chain.
897                     *block_size = lzma_mt_block_size(*filters);
898                     if (*block_size == 0)
899                               return LZMA_OPTIONS_ERROR;
900 
901                     assert(*block_size <= BLOCK_SIZE_MAX);
902           }
903 
904           // Calculate the maximum amount output that a single output buffer
905           // may need to hold. This is the same as the maximum total size of
906           // a Block.
907           *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
908           if (*outbuf_size_max == 0)
909                     return LZMA_MEM_ERROR;
910 
911           return LZMA_OK;
912 }
913 
914 
915 static void
get_progress(void * coder_ptr,uint64_t * progress_in,uint64_t * progress_out)916 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
917 {
918           lzma_stream_coder *coder = coder_ptr;
919 
920           // Lock coder->mutex to prevent finishing threads from moving their
921           // progress info from the worker_thread structure to lzma_stream_coder.
922           mythread_sync(coder->mutex) {
923                     *progress_in = coder->progress_in;
924                     *progress_out = coder->progress_out;
925 
926                     for (size_t i = 0; i < coder->threads_initialized; ++i) {
927                               mythread_sync(coder->threads[i].mutex) {
928                                         *progress_in += coder->threads[i].progress_in;
929                                         *progress_out += coder->threads[i]
930                                                             .progress_out;
931                               }
932                     }
933           }
934 
935           return;
936 }
937 
938 
939 static lzma_ret
stream_encoder_mt_init(lzma_next_coder * next,const lzma_allocator * allocator,const lzma_mt * options)940 stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
941                     const lzma_mt *options)
942 {
943           lzma_next_coder_init(&stream_encoder_mt_init, next, allocator);
944 
945           // Get the filter chain.
946           lzma_options_easy easy;
947           const lzma_filter *filters;
948           uint64_t block_size;
949           uint64_t outbuf_size_max;
950           return_if_error(get_options(options, &easy, &filters,
951                               &block_size, &outbuf_size_max));
952 
953 #if SIZE_MAX < UINT64_MAX
954           if (block_size > SIZE_MAX)
955                     return LZMA_MEM_ERROR;
956 #endif
957 
958           // Validate the filter chain so that we can give an error in this
959           // function instead of delaying it to the first call to lzma_code().
960           // The memory usage calculation verifies the filter chain as
961           // a side effect so we take advatange of that.
962           if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
963                     return LZMA_OPTIONS_ERROR;
964 
965           // Validate the Check ID.
966           if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
967                     return LZMA_PROG_ERROR;
968 
969           if (!lzma_check_is_supported(options->check))
970                     return LZMA_UNSUPPORTED_CHECK;
971 
972           // Allocate and initialize the base structure if needed.
973           lzma_stream_coder *coder = next->coder;
974           if (coder == NULL) {
975                     coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
976                     if (coder == NULL)
977                               return LZMA_MEM_ERROR;
978 
979                     next->coder = coder;
980 
981                     // For the mutex and condition variable initializations
982                     // the error handling has to be done here because
983                     // stream_encoder_mt_end() doesn't know if they have
984                     // already been initialized or not.
985                     if (mythread_mutex_init(&coder->mutex)) {
986                               lzma_free(coder, allocator);
987                               next->coder = NULL;
988                               return LZMA_MEM_ERROR;
989                     }
990 
991                     if (mythread_cond_init(&coder->cond)) {
992                               mythread_mutex_destroy(&coder->mutex);
993                               lzma_free(coder, allocator);
994                               next->coder = NULL;
995                               return LZMA_MEM_ERROR;
996                     }
997 
998                     next->code = &stream_encode_mt;
999                     next->end = &stream_encoder_mt_end;
1000                     next->get_progress = &get_progress;
1001 //                  next->update = &stream_encoder_mt_update;
1002 
1003                     coder->filters[0].id = LZMA_VLI_UNKNOWN;
1004                     coder->index_encoder = LZMA_NEXT_CODER_INIT;
1005                     coder->index = NULL;
1006                     memzero(&coder->outq, sizeof(coder->outq));
1007                     coder->threads = NULL;
1008                     coder->threads_max = 0;
1009                     coder->threads_initialized = 0;
1010           }
1011 
1012           // Basic initializations
1013           coder->sequence = SEQ_STREAM_HEADER;
1014           coder->block_size = (size_t)(block_size);
1015           coder->thread_error = LZMA_OK;
1016           coder->thr = NULL;
1017 
1018           // Allocate the thread-specific base structures.
1019           assert(options->threads > 0);
1020           if (coder->threads_max != options->threads) {
1021                     threads_end(coder, allocator);
1022 
1023                     coder->threads = NULL;
1024                     coder->threads_max = 0;
1025 
1026                     coder->threads_initialized = 0;
1027                     coder->threads_free = NULL;
1028 
1029                     coder->threads = lzma_alloc(
1030                                         options->threads * sizeof(worker_thread),
1031                                         allocator);
1032                     if (coder->threads == NULL)
1033                               return LZMA_MEM_ERROR;
1034 
1035                     coder->threads_max = options->threads;
1036           } else {
1037                     // Reuse the old structures and threads. Tell the running
1038                     // threads to stop and wait until they have stopped.
1039                     threads_stop(coder, true);
1040           }
1041 
1042           // Output queue
1043           return_if_error(lzma_outq_init(&coder->outq, allocator,
1044                               outbuf_size_max, options->threads));
1045 
1046           // Timeout
1047           coder->timeout = options->timeout;
1048 
1049           // Free the old filter chain and copy the new one.
1050           for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051                     lzma_free(coder->filters[i].options, allocator);
1052 
1053           return_if_error(lzma_filters_copy(
1054                               filters, coder->filters, allocator));
1055 
1056           // Index
1057           lzma_index_end(coder->index, allocator);
1058           coder->index = lzma_index_init(allocator);
1059           if (coder->index == NULL)
1060                     return LZMA_MEM_ERROR;
1061 
1062           // Stream Header
1063           coder->stream_flags.version = 0;
1064           coder->stream_flags.check = options->check;
1065           return_if_error(lzma_stream_header_encode(
1066                               &coder->stream_flags, coder->header));
1067 
1068           coder->header_pos = 0;
1069 
1070           // Progress info
1071           coder->progress_in = 0;
1072           coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1073 
1074           return LZMA_OK;
1075 }
1076 
1077 
1078 extern LZMA_API(lzma_ret)
lzma_stream_encoder_mt(lzma_stream * strm,const lzma_mt * options)1079 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1080 {
1081           lzma_next_strm_init(stream_encoder_mt_init, strm, options);
1082 
1083           strm->internal->supported_actions[LZMA_RUN] = true;
1084 //        strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1085           strm->internal->supported_actions[LZMA_FULL_FLUSH] = true;
1086           strm->internal->supported_actions[LZMA_FULL_BARRIER] = true;
1087           strm->internal->supported_actions[LZMA_FINISH] = true;
1088 
1089           return LZMA_OK;
1090 }
1091 
1092 
1093 // This function name is a monster but it's consistent with the older
1094 // monster names. :-( 31 chars is the max that C99 requires so in that
1095 // sense it's not too long. ;-)
1096 extern LZMA_API(uint64_t)
lzma_stream_encoder_mt_memusage(const lzma_mt * options)1097 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1098 {
1099           lzma_options_easy easy;
1100           const lzma_filter *filters;
1101           uint64_t block_size;
1102           uint64_t outbuf_size_max;
1103 
1104           if (get_options(options, &easy, &filters, &block_size,
1105                               &outbuf_size_max) != LZMA_OK)
1106                     return UINT64_MAX;
1107 
1108           // Memory usage of the input buffers
1109           const uint64_t inbuf_memusage = options->threads * block_size;
1110 
1111           // Memory usage of the filter encoders
1112           uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1113           if (filters_memusage == UINT64_MAX)
1114                     return UINT64_MAX;
1115 
1116           filters_memusage *= options->threads;
1117 
1118           // Memory usage of the output queue
1119           const uint64_t outq_memusage = lzma_outq_memusage(
1120                               outbuf_size_max, options->threads);
1121           if (outq_memusage == UINT64_MAX)
1122                     return UINT64_MAX;
1123 
1124           // Sum them with overflow checking.
1125           uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126                               + sizeof(lzma_stream_coder)
1127                               + options->threads * sizeof(worker_thread);
1128 
1129           if (UINT64_MAX - total_memusage < inbuf_memusage)
1130                     return UINT64_MAX;
1131 
1132           total_memusage += inbuf_memusage;
1133 
1134           if (UINT64_MAX - total_memusage < filters_memusage)
1135                     return UINT64_MAX;
1136 
1137           total_memusage += filters_memusage;
1138 
1139           if (UINT64_MAX - total_memusage < outq_memusage)
1140                     return UINT64_MAX;
1141 
1142           return total_memusage + outq_memusage;
1143 }
1144