1use warnings; 2 3BEGIN { 4 chdir 't' if -d 't'; 5 push @INC ,'../lib'; 6 require Config; import Config; 7 unless ($Config{'useithreads'}) { 8 print "1..0 # Skip: no ithreads\n"; 9 exit 0; 10 } 11} 12 13use strict; 14use threads; 15use Thread::Queue; 16 17my $q = new Thread::Queue; 18$|++; 19print "1..26\n"; 20 21my $test : shared = 1; 22 23sub ok { 24 lock($test); 25 print "ok $test\n"; 26 $test++; 27} 28 29sub reader { 30 my $tid = threads->self->tid; 31 my $i = 0; 32 while (1) { 33 $i++; 34# print "reader (tid $tid): waiting for element $i...\n"; 35 my $el = $q->dequeue; 36 ok(); 37# print "ok $test\n"; $test++; 38# print "reader (tid $tid): dequeued element $i: value $el\n"; 39 select(undef, undef, undef, rand(1)); 40 if ($el == -1) { 41 # end marker 42# print "reader (tid $tid) returning\n"; 43 return; 44 } 45 } 46} 47 48my $nthreads = 5; 49my @threads; 50 51for (my $i = 0; $i < $nthreads; $i++) { 52 push @threads, threads->new(\&reader, $i); 53} 54 55for (my $i = 1; $i <= 20; $i++) { 56 my $el = int(rand(100)); 57 select(undef, undef, undef, rand(1)); 58# print "writer: enqueuing value $el\n"; 59 $q->enqueue($el); 60} 61 62$q->enqueue((-1) x $nthreads); # one end marker for each thread 63 64for(@threads) { 65# print "waiting for join\n"; 66 $_->join(); 67} 68ok(); 69#print "ok $test\n"; 70 71 72