Difference between revisions of "Threads and Executors"

From CSE231 Wiki
Jump to navigation Jump to search
 
(36 intermediate revisions by 2 users not shown)
Line 1: Line 1:
 +
This assignment has been updated: [[Thread_and_Executor_Service_Assignment|Thread and ExecutorService]]
 +
<!--
 +
credit for this assignment: [[User:Finn|Finn Voichick]] and [[User:cosgroved|Dennis Cosgrove]]
 
=Motivation=
 
=Motivation=
The [http://x10-lang.org/ X10 family of programming languages], including [https://wiki.rice.edu/confluence/display/HABANERO/Habanero+Extreme+Scale+Software+Research+Project Habanero] and our own modest [https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/edu/wustl/cse231s/v5/V5.html V5], provide a less cumbersome way to write parallel programs with [https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/edu/wustl/cse231s/v5/V5.html#async-edu.wustl.cse231s.v5.api.CheckedRunnable- async] and [https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/edu/wustl/cse231s/v5/V5.html#finish-edu.wustl.cse231s.v5.api.CheckedRunnable- finish].
+
The [http://x10-lang.org/ X10 family of programming languages], including [https://wiki.rice.edu/confluence/display/HABANERO/Habanero+Extreme+Scale+Software+Research+Project Habanero] and our own modest [https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/edu/wustl/cse231s/v5/V5.html V5], provide a less cumbersome way to write parallel programs with [https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/edu/wustl/cse231s/v5/V5.html#async-edu.wustl.cse231s.v5.api.CheckedRunnable- async] and [https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/edu/wustl/cse231s/v5/V5.html#finish-edu.wustl.cse231s.v5.api.CheckedRunnable- finish].
  
 
Finish has particularly nice semantics: keeping track of and joining all of its descendant tasks.   
 
Finish has particularly nice semantics: keeping track of and joining all of its descendant tasks.   
Line 12: Line 15:
 
=Code to Implement=
 
=Code to Implement=
 
==(Optional But Recommended) Join All Warm Up==
 
==(Optional But Recommended) Join All Warm Up==
 +
[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html ConcurrentLinkedQueue] is a thread safe data structure.  Multiple threads can add and remove from it without fear.  The order they will be in might not be deterministic, but it won't lose anybody.  We will end up using a ConcurrentLinkedQueue to track all of the spawned Futures in the [[#XQuicksort]] section of this lab.
 +
 +
Sadly, [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html ConcurrentLinkedQueue's documentation] reports that:
 +
 +
: ''"Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator."''
 +
 +
This means that if all of the items aren't in the queue when you start iterating through them, then you are not guaranteed to get updated when new items get added.
 +
 +
Therefore, using the standard iterating for loop from <code>ThreadsRightNow</code> will NOT work for <code>ThreadsEventually</code>:
  
source: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html
+
<nowiki>/* package-private */ static int joinAllInQueueViaIteration(Queue<Thread> queue) throws InterruptedException {
 +
int count = 0;
 +
for (Thread thread : queue) {
 +
thread.join();
 +
count++;
 +
}
 +
return count;
 +
}</nowiki>
  
''"Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator."''
+
Think about how you can make sure that all of threads are joined.
 +
<spoiler show="spoiler" hide="spoiler">Drain the queue by repeatedly polling until it is empty.  Be sure to actually join the threads.</spoiler>
  
 
{{CodeToImplement|ThreadsEventually|joinAllInQueueViaPoll|tnx.warmup.joinall}}
 
{{CodeToImplement|ThreadsEventually|joinAllInQueueViaPoll|tnx.warmup.joinall}}
Line 21: Line 41:
 
{{Sequential|private static int joinAllInQueueViaPoll(Queue<Thread> queue)}}
 
{{Sequential|private static int joinAllInQueueViaPoll(Queue<Thread> queue)}}
  
ThreadsEventually joinAllInQueueViaPoll
+
NOTE: this method should return the number of threads joined.
  
 
Completing this optional warm up will help you when you implement [[#XQuicksort]].
 
Completing this optional warm up will help you when you implement [[#XQuicksort]].
  
===Correctness===
+
===Testing Your Solution===
 +
====Correctness====
 
{{TestSuite|JoinAllTestSuite|tnx.warmup.joinall}}
 
{{TestSuite|JoinAllTestSuite|tnx.warmup.joinall}}
  
Line 40: Line 61:
 
:[https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#start-- start]
 
:[https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#start-- start]
 
:[https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#start-- join]
 
:[https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#start-- join]
 +
 +
As always, the [[Reference_Page#Threads|wiki's reference page]] can be of help.
  
 
===SimpleThreadFactory===
 
===SimpleThreadFactory===
Line 66: Line 89:
  
 
==Executors==
 
==Executors==
<youtube>8qh9LMhvNXI</youtube> <!-- box link: [https://wustl.box.com/s/2pqv6vsbirt6zi4304ycrh7jfgkb0dhi Executor submit and Future get]-->
+
<youtube>8qh9LMhvNXI</youtube>  
 
+
<youtube>tuAkLb99sLE</youtube>  
<youtube>tuAkLb99sLE</youtube> <!-- box link: [https://wustl.box.com/s/17q7xupx1tagdnnfg057teme4lbq4w6r Executor invokeAll]-->
 
  
 
===Code To Use===
 
===Code To Use===
Line 78: Line 100:
 
:[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html#get-- get()]
 
:[https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html#get-- get()]
  
[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/count/assignment/NucleobaseCounting.html NucleobaseCounting]
+
[http://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/count/assignment/NucleobaseCounting.html NucleobaseCounting]
: [http://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/count/assignment/NucleobaseCounting.html#countRangeSequential-byte:A-edu.wustl.cse231s.bioinformatics.Nucleobase-int-int- countRangeSequential(chromosome,targetNucleobase,min,maxExclusive)]
+
: [http://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/count/assignment/NucleobaseCounting.html#countRangeSequential-byte:A-edu.wustl.cse231s.bioinformatics.Nucleobase-int-int- countRangeSequential(chromosome,targetNucleobase,min,maxExclusive)]
  
[https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/slice/studio/Slices.html Slices]
+
[https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/slice/studio/Slices.html Slices]
:[https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/slice/studio/Slices.html#createNSlices-byte:A-int- <nowiki>createNSlices(data, numSlices)</nowiki>].
+
:[https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/slice/studio/Slices.html#createNSlices-byte:A-int- <nowiki>createNSlices(data, numSlices)</nowiki>].
  
<!--
+
 
 +
As always, the [[Reference_Page#Executors|wiki's reference page]] can be of help.
  
 
===Mistake To Avoid: Do NOT call shutdown===
 
===Mistake To Avoid: Do NOT call shutdown===
 
Each method you write will be passed an executor.  You need not (and should NOT) invoke [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#shutdown-- shutdown()].  That is the responsibility of whoever created the executor.  For example, a JUnit test would do this if it was appropriate (i.e. it created an executor and was not using the [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html#commonPool-- common pool]).
 
Each method you write will be passed an executor.  You need not (and should NOT) invoke [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#shutdown-- shutdown()].  That is the responsibility of whoever created the executor.  For example, a JUnit test would do this if it was appropriate (i.e. it created an executor and was not using the [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html#commonPool-- common pool]).
-->
 
  
 
===XNucleobaseCount===
 
===XNucleobaseCount===
Line 94: Line 116:
 
{{CodeToImplement|XNucleobaseCount|countLowerUpperSplit<br>countNWaySplit<br>countDivideAndConquer<br>countDivideAndConquerKernel|tnx.lab.executor}}
 
{{CodeToImplement|XNucleobaseCount|countLowerUpperSplit<br>countNWaySplit<br>countDivideAndConquer<br>countDivideAndConquerKernel|tnx.lab.executor}}
  
{{Tip|Use [http://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/count/assignment/NucleobaseCounting.html#countRangeSequential-byte:A-edu.wustl.cse231s.bioinformatics.Nucleobase-int-int- countRangeSequential()] from [[Nucleobase_Counting]]}}
+
{{Tip|Use [https://www.cse.wustl.edu/~dennis.cosgrove/courses/cse231/current/apidocs/count/lab/NucleobaseCounting.html#countRangeSequential(byte%5B%5D,edu.wustl.cse231s.bioinformatics.Nucleobase,int,int) countRangeSequential()] from [[Nucleobase_Counting]]}}
 
====lower upper split====
 
====lower upper split====
 
{{Parallel|int countLowerUpperSplit(ExecutorService executor, byte[] chromosome, Nucleobase nucleobase)}}
 
{{Parallel|int countLowerUpperSplit(ExecutorService executor, byte[] chromosome, Nucleobase nucleobase)}}
 +
 +
NOTE: the tests will enforce that you use [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#submit-java.util.concurrent.Callable- submit] and [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html#get-- get].
 +
 
====n-way split====
 
====n-way split====
 
{{Parallel|int countNWaySplit(ExecutorService executor, byte[] chromosome, Nucleobase nucleobase, int numTasks)}}
 
{{Parallel|int countNWaySplit(ExecutorService executor, byte[] chromosome, Nucleobase nucleobase, int numTasks)}}
  
Please feel free to use your own [https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/slice/studio/Slices.html Slices] [https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/slice/studio/Slices.html#createNSlices-byte:A-int- <nowiki>createNSlices(byte[] data, int numSlices)</nowiki>].  Then use the [https://docs.oracle.com/javase/1.5.0/docs/guide/language/foreach.html Java for each loop] to iterate over the slices to create your tasks.
+
Please feel free to use your own [https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/slice/studio/Slices.html Slices] [https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/slice/studio/Slices.html#createNSlices-byte:A-int- <nowiki>createNSlices(byte[] data, int numSlices)</nowiki>].  Then use the [https://docs.oracle.com/javase/1.5.0/docs/guide/language/foreach.html Java for each loop] to iterate over the slices to create your tasks.
 +
 
 +
NOTE: the tests will enforce that you use [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#invokeAll-java.util.Collection- invokeAll].
  
 
====divide-and-conquer====
 
====divide-and-conquer====
Line 108: Line 135:
  
 
{{Parallel|int countDivideAndConquerKernel(ExecutorService executor, byte[] chromosome, Nucleobase nucleobase, int min, int maxExclusive, int threshold)}}
 
{{Parallel|int countDivideAndConquerKernel(ExecutorService executor, byte[] chromosome, Nucleobase nucleobase, int min, int maxExclusive, int threshold)}}
 +
 +
NOTE: the tests will enforce that you use [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#submit-java.util.concurrent.Callable- submit] and [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html#get-- get].
 +
  
 
NOTE: When you get down below the threshold and convert from parallel to sequential execution, do NOT feel compelled to build a sequential divide and conquer.  Just invoke countRangeSequential on the remaining range.  It is not like divide and conquer gives you a performance benefit in counting like it would for sorting.
 
NOTE: When you get down below the threshold and convert from parallel to sequential execution, do NOT feel compelled to build a sequential divide and conquer.  Just invoke countRangeSequential on the remaining range.  It is not like divide and conquer gives you a performance benefit in counting like it would for sorting.
  
 
===XQuicksort===
 
===XQuicksort===
{{CodeToImplement|XQuicksort|sequentialQuicksort<br>sequentialQuicksortKernel<br>parallelQuicksort<br>parallelQuicksortKernel|tnx.lab.executor}}
 
 
 
Quicksort is an oldie but a goodie.  First developed in 1959 and [https://dl.acm.org/citation.cfm?doid=366622.366644 published in 1961] it is still the go to sorting algorithm today.  The JDK8 implementation of [https://docs.oracle.com/javase/8/docs/api/java/util/Arrays.html#sort-int:A- Arrays.sort(array)] is a DualPivotQuicksort.
 
Quicksort is an oldie but a goodie.  First developed in 1959 and [https://dl.acm.org/citation.cfm?doid=366622.366644 published in 1961] it is still the go to sorting algorithm today.  The JDK8 implementation of [https://docs.oracle.com/javase/8/docs/api/java/util/Arrays.html#sort-int:A- Arrays.sort(array)] is a DualPivotQuicksort.
  
Line 130: Line 158:
 
{{Warning|Unlike in Sarkar's and McDowell’s videos which use inclusive maximums, CSE 231 consistently uses exclusive maximums to avoid having to subtract 1 all of the time.}}
 
{{Warning|Unlike in Sarkar's and McDowell’s videos which use inclusive maximums, CSE 231 consistently uses exclusive maximums to avoid having to subtract 1 all of the time.}}
  
 +
====Code To Use====
 +
[https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/sort/core/quick/Partitioner.html interface Partitioner]
 +
:[https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/sort/core/quick/Partitioner.html#partitionRange-int:A-int-int- partitionRange(data,min,maxExclusive)]
 +
[https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/sort/core/quick/PivotLocation.html class PivotLocation]
 +
:[https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/sort/core/quick/PivotLocation.html#getLeftSidesUpperExclusive-- getLeftSidesUpperExclusive()]
 +
:[https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/sort/core/quick/PivotLocation.html#getRightSidesLowerInclusive-- getRightSidesLowerInclusive()]
  
====Code To Use====
+
Note: Investigate SequentialPartitioner.
[https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/sort/core/quick/Partitioner.html interface Partitioner]
 
:[https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/sort/core/quick/Partitioner.html#partitionRange-int:A-int-int- partitionRange(data,min,maxExclusive)]
 
[https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/sort/core/quick/PivotLocation.html class PivotLocation]
 
:[https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/sort/core/quick/PivotLocation.html#getLeftSidesUpperExclusive-- getLeftSidesUpperExclusive()]
 
:[https://www.cse.wustl.edu/~cosgroved/courses/cse231/s18/apidocs/sort/core/quick/PivotLocation.html#getRightSidesLowerInclusive-- getRightSidesLowerInclusive()]
 
  
====sequential====
+
====XSequentialQuicksorter====
 +
{{CodeToImplement|XSequentialQuicksorter|sortRange|tnx.lab.executor}}
  
<!--
+
{{Sequential|public void sortRange(T[] data, Comparator<T> comparator, int min, int maxExclusive)}}
The provided sequentialQuicksort method hopes to get you on the right track.
 
<nowiki>public static void sequentialQuicksort(int[] array) {
 
sequentialQuicksortKernel(array, 0, array.length);
 
}</nowiki>
 
-->
 
  
{{Sequential|void sequentialQuicksort(int[] data, Partitioner partitioner)}}
+
{{Warning|Do NOT implement your own partition.  Call Partitioner partitionRange method.  It will do this work for you.}}
  
{{Sequential|void sequentialQuicksortKernel(int[] data, int min, int maxExclusive, Partitioner partitioner)}}
+
{{Warning|Do NOT invoke partitionRange twice.  Invoke partitionRange once and catch the return value in a variable.  Then use the two methods on the single instance of PivotLocation. }}
  
====parallel====
+
====XParallelQuicksorter====
{{Parallel|void parallelQuicksort(ExecutorService executor, int[] data, int threshold, Partitioner partitioner)}}
+
{{CodeToImplement|XParallelQuicksorter|kernel<br/>sortRange|tnx.lab.executor}}
  
{{Tip | Make sure to use a thread safe data structure like [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html ConcurrentLinkedQueue] and NOT an unsafe data structure like [https://docs.oracle.com/javase/8/docs/api/java/util/LinkedList.html LinkedList]. }}
+
{{Parallel|private void kernel(Queue<Future<?>> futures, T[] data, Comparator<T> comparator, int min, int maxExclusive)}}
  
{{Tip | <code>return null;</code> from your lambdas to invoke the [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#submit-java.util.concurrent.Callable- overloaded Callable version of submit(task)] to deal with the checked exceptions. }}
+
{{Tip | Be sure to <code>test</code> the <code>isParallelPredicate</code> to determine if the current range length is worthy of parallel processing or if you should fall back to the <code>sequentialSorter</code>. }}
  
{{Parallel|void parallelQuicksortKernel(ExecutorService executor, int[] data, int min, int maxExclusive, Queue<Future<?>> futures, int threshold, Partitioner partitioner)}}
+
{{Sequential|public void sortRange(T[] data, Comparator<T> comparator, int min, int maxExclusive)}}
  
==(Optional) Parallel Partition Challenge==
+
{{Tip | Make sure to use a thread safe data structure like [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html ConcurrentLinkedQueue] and NOT an unsafe data structure like [https://docs.oracle.com/javase/8/docs/api/java/util/LinkedList.html LinkedList]. }}
The partitioning step can also be done in [http://www.classes.cec.wustl.edu/~cse341/web/handouts/lecture07.pdf parallel with scan]. While not particularly practical, it can get the CPL down to log^k(n).
 
  
{{CodeToImplement|ParallelPartitioner|partitionRange|sort.fun.quick}}
+
{{Tip | <code>return null;</code> from your lambdas to invoke the [https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#submit-java.util.concurrent.Callable- overloaded Callable version of submit(task)] to deal with the checked exceptions. }}
  
{{Parallel|PivotLocation partitionRange(int[] data, int min, int maxExclusive)}}
+
{{Warning|Do NOT implement your own partition.  Call Partitioner partitionRange method.  It will do this work for you.}}
  
Scan is an important part of parallelizing partition:
+
{{Warning|Do NOT invoke partitionRange twice.  Invoke partitionRange once and catch the return value in a variable.  Then use the two methods on the single instance of PivotLocation. }}
  
<youtube>RdfmxfZBHpo</youtube>
+
==(Optional) Parallel Partition Challenge==
 +
The partitioning step can also be done in [http://www.classes.cec.wustl.edu/~cse341/web/handouts/lecture07.pdf parallel with scan].  While not particularly practical, it can get the CPL down to <math>\mathcal{O}(\lg^k{}n)</math>.
  
<youtube>mmYv3Haj6uc</youtube>
+
For details on how to complete this challenge, check out: [[Quicksort_Parallel_Partitioner]]
  
 
=Testing Your Solution=
 
=Testing Your Solution=
Line 182: Line 207:
 
{{TestSuite|NucleobaseExecutorTestSuite|tnx.lab.executor}}
 
{{TestSuite|NucleobaseExecutorTestSuite|tnx.lab.executor}}
 
{{TestSuite|QuicksortExecutorTestSuite|tnx.lab.executor}}
 
{{TestSuite|QuicksortExecutorTestSuite|tnx.lab.executor}}
 +
====Less Time Consuming Suite====
 +
{{TestSuite|QuicksortExecutorNonWeaklyConsistentIteratorTestSuite|tnx.lab.executor}}
  
 
==Performance==
 
==Performance==
Line 213: Line 240:
  
 
=Pledge, Acknowledgments, Citations=
 
=Pledge, Acknowledgments, Citations=
As always, fill out the Pledge, Acknowledgments, and Citations file: <code>lab3-pledge-acknowledgments-citations.txt</code>
+
{{Pledge|lab-threads-and-executors}}
 +
-->

Latest revision as of 19:40, 22 February 2022

This assignment has been updated: Thread and ExecutorService