Difference between revisions of "MapReduce Frameworks Lab"

From CSE231 Wiki
Jump to navigation Jump to search
 
(54 intermediate revisions by the same user not shown)
Line 1: Line 1:
 +
[[Bottleneck_MapReduce_Framework_Assignment|Bottleneck MapReduce Framework]]
 +
 +
[[Matrix_MapReduce_Framework_Assignment|Matrix MapReduce Framework]]
 +
 +
<!--
 
credit for this assignment: Finn Voichick and Dennis Cosgrove
 
credit for this assignment: Finn Voichick and Dennis Cosgrove
 
=Motivation=
 
=Motivation=
Line 38: Line 43:
  
 
=Code To Use=
 
=Code To Use=
To allow our frameworks to work well with JDK8 Streams, we employ a couple of standard interfaces over creating out own custom ones.
+
To allow our frameworks to work well with JDK8 Streams, we employ a couple of standard interfaces over creating our own custom ones.
  
 
==BiConsumer==
 
==BiConsumer==
Line 64: Line 69:
 
[https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/slice/studio/Slices.html class Slices]
 
[https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/slice/studio/Slices.html class Slices]
 
: [https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/slice/studio/Slices.html#createNSlices(C%5B%5D,int) <nowiki>List<Slice<C[]>> createNSlices(C[] data, int numSlices)</nowiki>]
 
: [https://www.cse.wustl.edu/~cosgroved/courses/cse231/current/apidocs/slice/studio/Slices.html#createNSlices(C%5B%5D,int) <nowiki>List<Slice<C[]>> createNSlices(C[] data, int numSlices)</nowiki>]
 +
 +
==HashUtils==
 +
[https://www.cse.wustl.edu/~cosgroved/courses/cse231/s20/apidocs/hash/studio/HashUtils.html#toIndex(java.lang.Object,int) toIndex(key,N)]
  
 
==MultiWrapMap==
 
==MultiWrapMap==
Line 74: Line 82:
 
* [[#Optional_Warm_Up]]
 
* [[#Optional_Warm_Up]]
 
* [[#Bottlenecked_MapReduce_Framework]]
 
* [[#Bottlenecked_MapReduce_Framework]]
* Wait For Thursday's Class Session (If Applicable)
 
 
* [[Cholera_MapReduce_Application]]
 
* [[Cholera_MapReduce_Application]]
 
* [[#Matrix_MapReduce_Framework]]
 
* [[#Matrix_MapReduce_Framework]]
Line 87: Line 94:
 
==WordCountConcreteStaticMapReduce==
 
==WordCountConcreteStaticMapReduce==
 
{{CodeToImplement|WordCountConcreteStaticMapReduce|mapAll<br>accumulateAll<br>finishAll|mapreduce.framework.warmup.wordcount}}
 
{{CodeToImplement|WordCountConcreteStaticMapReduce|mapAll<br>accumulateAll<br>finishAll|mapreduce.framework.warmup.wordcount}}
 +
===Mapper===
 +
====map (Provided)====
 +
<nowiki> static void map(TextSection textSection, BiConsumer<String, Integer> keyValuePairConsumer) {
 +
mapper.map(textSection, keyValuePairConsumer);
 +
}</nowiki>
 +
 +
===Reducer===
 +
====createMutableContainer (Provided)====
 +
<nowiki> static List<Integer> createMutableContainer() {
 +
return collector.supplier().get();
 +
}</nowiki>
 +
 +
====accumulate (Provided)====
 +
<nowiki> static void accumulate(List<Integer> list, int v) {
 +
collector.accumulator().accept(list, v);
 +
}</nowiki>
 +
 +
====combine (Provided)====
 +
<nowiki> static List<Integer> combine(List<Integer> a, List<Integer> b) {
 +
return collector.combiner().apply(a, b);
 +
}</nowiki>
 +
 +
====reduce (Provided)====
 +
<nowiki> static int reduce(List<Integer> list) {
 +
return collector.finisher().apply(list);
 +
}</nowiki>
 +
 
===Framework===
 
===Framework===
 +
====mapAll====
 +
mapAll can be performed in parallel.  A task should be created for each item in the input array.  Each task should accept the emitted (key, value) pairs and store them in its own List to avoid data races.  These lists make up the array which is returned (one list for each item in the input array).
 +
 +
<youtube>-4prus5cNFo</youtube>
 +
 
{{Parallel|List<KeyValuePair<String, Integer>>[] mapAll(TextSection[] input)}}
 
{{Parallel|List<KeyValuePair<String, Integer>>[] mapAll(TextSection[] input)}}
  
 +
{{Warning| When first created arrays of Objects are filled with null.  You will need to assign each array index to a new List before you start the process of adding key-value pairs}}
 +
 +
{{Warning| Reminder: our course libraries consistently specify max to be exclusive.  This includes the parallel forall loop.}}
 +
 +
{{Tip|You are encouraged to utilize the provided [[#map_.28Provided.29|map]] method.}}
 +
 +
[[File:Map_all.svg]]
 +
 +
====accumulateAll====
 
{{Sequential|static Map<String, List<Integer>> accumulateAll(List<KeyValuePair<String, Integer>>[] mapAllResults)}}
 
{{Sequential|static Map<String, List<Integer>> accumulateAll(List<KeyValuePair<String, Integer>>[] mapAllResults)}}
  
 +
{{Tip|You are encouraged to utilize the provided [[#createMutableContainer_.28Provided.29|createMutableContainer]] and [[#accumulate_.28Provided.29|accumulate]] methods.}}
 +
 +
[[File:Accumulate_all.svg]]
 +
 +
====finishAll====
 
{{Parallel|static Map<String, Integer> finishAll(Map<String, List<Integer>> accumulateAllResult)}}
 
{{Parallel|static Map<String, Integer> finishAll(Map<String, List<Integer>> accumulateAllResult)}}
 +
 +
{{Tip|You are encouraged to utilize the provided [[#reduce_.28Provided.29|reduce]] method.}}
 +
 +
[[File:Finish_all.svg]]
  
 
===Testing Your Solution===
 
===Testing Your Solution===
Line 101: Line 158:
 
{{CodeToImplement|MutualFriendsConcreteStaticMapReduce|map<br>reduceCreateList<br>reduceAccumulate<br>reduceCombine<br>reduceFinish<br>mapAll<br>accumulateAll<br>finishAll|mapreduce.framework.warmup.friends}}
 
{{CodeToImplement|MutualFriendsConcreteStaticMapReduce|map<br>reduceCreateList<br>reduceAccumulate<br>reduceCombine<br>reduceFinish<br>mapAll<br>accumulateAll<br>finishAll|mapreduce.framework.warmup.friends}}
 
===Mapper===
 
===Mapper===
 +
====map====
 
{{Sequential|static void map(Account account, BiConsumer<OrderedPair<AccountId>, Set<AccountId>> keyValuePairConsumer)}}
 
{{Sequential|static void map(Account account, BiConsumer<OrderedPair<AccountId>, Set<AccountId>> keyValuePairConsumer)}}
  
 
===Reducer===
 
===Reducer===
{{Sequential|static List<Set<AccountId>> reduceCreateList()}}
+
====createMutableContainer====
 +
{{Sequential|static List<Set<AccountId>> createMutableContainer()}}
  
{{Sequential|static void reduceAccumulate(List<Set<AccountId>> list, Set<AccountId> v)}}
+
====accumulate====
 +
{{Sequential|static void accumulate(List<Set<AccountId>> list, Set<AccountId> v)}}
  
{{Sequential|static void reduceCombine(List<Set<AccountId>> a, List<Set<AccountId>> b)}}
+
====combine====
 +
{{Sequential|static void combine(List<Set<AccountId>> a, List<Set<AccountId>> b)}}
 +
 
 +
====reduce====
 +
{{Sequential|static AccountIdMutableContainer reduce(List<Set<AccountId>> list)}}
  
{{Sequential|static MutualFriendIds reduceFinish(List<Set<AccountId>> list)}}
 
 
===Framework===
 
===Framework===
 +
====mapAll====
 
{{Parallel|static List<KeyValuePair<OrderedPair<AccountId>, Set<AccountId>>>[] mapAll(Account[] input)}}
 
{{Parallel|static List<KeyValuePair<OrderedPair<AccountId>, Set<AccountId>>>[] mapAll(Account[] input)}}
  
 +
{{Warning| When first created arrays of Objects are filled with null.  You will need to assigned each array index to a new List before you start the process of adding key-value pairs}}
 +
 +
{{Warning| Reminder: our course libraries consistently specify max to be exclusive.  This includes the parallel forall loop.}}
 +
 +
{{Tip|You are encouraged to utilize the [[#map|map]] method you implemented.}}
 +
====accumulateAll====
 
{{Sequential|static Map<OrderedPair<AccountId>, List<Set<AccountId>>> accumulateAll(List<KeyValuePair<OrderedPair<AccountId>, Set<AccountId>>>[] mapAllResults)}}
 
{{Sequential|static Map<OrderedPair<AccountId>, List<Set<AccountId>>> accumulateAll(List<KeyValuePair<OrderedPair<AccountId>, Set<AccountId>>>[] mapAllResults)}}
  
 +
{{Tip|You are encouraged to utilize the [[#reduceCreateList|reduceCreateList]] and [[#reduceAccumulate|reduceAccumulate]] methods you implemented.}}
 +
 +
====finishAll====
 
{{Parallel|static Map<OrderedPair<AccountId>, MutualFriendIds> finishAll(Map<OrderedPair<AccountId>, List<Set<AccountId>>> accumulateAllResult)}}
 
{{Parallel|static Map<OrderedPair<AccountId>, MutualFriendIds> finishAll(Map<OrderedPair<AccountId>, List<Set<AccountId>>> accumulateAllResult)}}
 +
 +
{{Tip|You are encouraged to utilize the [[#reduceFinish|reduceFinish]] method you implemented.}}
  
 
===Testing Your Solution===
 
===Testing Your Solution===
Line 131: Line 206:
  
 
===mapAll===
 
===mapAll===
 +
NOTE: If you struggle to get through this method, you are strongly encouraged to try the warm-ups.
 +
 
{{Parallel|List<KeyValuePair<K, V>>[] mapAll(E[] input)}}
 
{{Parallel|List<KeyValuePair<K, V>>[] mapAll(E[] input)}}
  
With this method, you will map all of the elements of an array of data into a new array of equivalent size consisting of a list of key value pairs. In order to do this, you must define the map() method for the mapper by specifying that it should add a new key value pair to a previously empty list. This list should then be added to the array of lists you previously defined, therefore completing the mapping stage of MapReduce. This should all be done in parallel.
+
{{Warning| When first created arrays of Objects are filled with null.  You will need to assigned each array index to a new List before you start the process of adding key-value pairs}}
 +
 
 +
{{Warning| Reminder: our course libraries consistently specify max to be exclusive.  This includes the parallel forall loop.}}
 +
 
 +
With this method, you will map all of the elements of an array of data into a new array of equivalent size consisting of Lists of key value pairs. We will leverage the Mapper which is a field/instance variable on this BottleneckedFramework instance.  When invoking the mapper's map method with an element of the input array and a BiConsumer which will  accept each key and value passed to it, adding a KeyValuePair to its List. This list should then be added to the array of lists you previously defined, therefore completing the mapping stage of MapReduce. This should all be done in parallel.
  
 
Hint: you should create an array of lists equivalent in size to the original array. Each list will contain all of the emitted (key,value) pairs for its item.
 
Hint: you should create an array of lists equivalent in size to the original array. Each list will contain all of the emitted (key,value) pairs for its item.
  
[https://docs.google.com/presentation/d/1Gtpj6_5_8imUMccpwxmfrOxKSED791OKl4lZR6G2Bm4/pub?start=false&loop=false&delayms=3000&slide=id.g7ea283730e_0_130 [[File:Bottlenecked map all.png|400px]]]
+
[[File:Bottlenecked map all.png|400px]] [https://docs.google.com/presentation/d/1Gtpj6_5_8imUMccpwxmfrOxKSED791OKl4lZR6G2Bm4/pub?start=false&loop=false&delayms=3000&slide=id.g7ea283730e_0_130 slide]
  
 
===accumulateAll===
 
===accumulateAll===
Line 146: Line 227:
 
In this method, you will take in the array of lists you previously created and accumulate the key value pairs in the lists into a newly defined map.  To help deal with this issue, you must make use of the Collector provided to you. More specifically, access the accumulator in the collector by calling the <code>accumulator()</code> method and accept the key/value pair when you add it to the map. You probably noticed that the method must return a map of <K, A>, which differs from the <K, V> generics fed into the method. The framework is designed this way as the data originally fed into the mapping stage can be collected into a mutable container before reaching the finish/reduce stage. In order to access the correct value for the map if the key has no associated value yet, use the supplier associated with the Collector with the <code>supplier()</code> method.
 
In this method, you will take in the array of lists you previously created and accumulate the key value pairs in the lists into a newly defined map.  To help deal with this issue, you must make use of the Collector provided to you. More specifically, access the accumulator in the collector by calling the <code>accumulator()</code> method and accept the key/value pair when you add it to the map. You probably noticed that the method must return a map of <K, A>, which differs from the <K, V> generics fed into the method. The framework is designed this way as the data originally fed into the mapping stage can be collected into a mutable container before reaching the finish/reduce stage. In order to access the correct value for the map if the key has no associated value yet, use the supplier associated with the Collector with the <code>supplier()</code> method.
  
Hint: Look into the <code>compute()</code> method for maps.
+
[[File:Bottlenecked accumulate all.png|400px]] [https://docs.google.com/presentation/d/1Gtpj6_5_8imUMccpwxmfrOxKSED791OKl4lZR6G2Bm4/pub?start=false&loop=false&delayms=3000&slide=id.g7ea283730e_0_165 slide]
 
 
[https://docs.google.com/presentation/d/1RG1RBPSDZu4LpVu-AC3lolWX4P3Tnl1cCMvjJ5Za74A/pub?start=false&loop=false&delayms=3000&slide=id.g30d086461f_0_99 slide]
 
  
 
===finishAll===
 
===finishAll===
Line 157: Line 236:
 
To reduce the data down, use the map returned from the accumulateAll stage and put the results of the reduction into a new map. The provided Collector will come in extremely handy for this stage, more specifically the finisher which can be called using the <code>finisher()</code> method. This step should run in parallel and will probably be the easiest of the three methods.
 
To reduce the data down, use the map returned from the accumulateAll stage and put the results of the reduction into a new map. The provided Collector will come in extremely handy for this stage, more specifically the finisher which can be called using the <code>finisher()</code> method. This step should run in parallel and will probably be the easiest of the three methods.
  
<!--
+
[[File:Bottlenecked finish all.png|400px]] [https://docs.google.com/presentation/d/1Gtpj6_5_8imUMccpwxmfrOxKSED791OKl4lZR6G2Bm4/pub?start=false&loop=false&delayms=3000&slide=id.g7ea283730e_0_221 slide]
[https://docs.google.com/presentation/d/1RG1RBPSDZu4LpVu-AC3lolWX4P3Tnl1cCMvjJ5Za74A/pub?start=false&loop=false&delayms=3000&slide=id.g30d086461f_0_200 slide]
 
-->
 
  
 
==Matrix MapReduce Framework==
 
==Matrix MapReduce Framework==
Line 168: Line 245:
 
Navigate to the <code>MatrixMapReduceFramework.java</code> class and there will be two methods for you to complete: mapAndAccumulateAll and combineAndFinishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce.
 
Navigate to the <code>MatrixMapReduceFramework.java</code> class and there will be two methods for you to complete: mapAndAccumulateAll and combineAndFinishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce.
  
The matrix framework is much more complex than the bottlenecked framework, but it boosts performance by grouping the map and accumulate stages so that everything can run in parallel. It does so by slicing up the given data into the specified mapTaskCount number of slices and assigns a reduce task number to each entry using the provided getReduceIndex() method. This, in effect, creates a matrix of maps, hence the name of the framework. In the combineAndFinishAll stage, the matrix comes in handy by allowing us to go directly down the matrix (as each key is essentially grouped into a bucket), combining and reducing elements all-in-one. This concept was explained in more depth during class.
+
The matrix framework is much more complex than the bottlenecked framework, but it boosts performance by grouping the map and accumulate stages so that everything can run in parallel. It does so by slicing up the given data into the specified mapTaskCount number of slices and assigns a reduce task number to each entry using the HashUtils toIndex() method. This, in effect, creates a matrix of dictionaries, hence the name of the framework. In the combineAndFinishAll stage, the matrix comes in handy by allowing us to go directly down the columns of the matrix (as each key is essentially grouped into a bucket), combining and reducing elements all-in-one. This concept was explained in more depth during class.
  
 
===mapAndAccumulateAll===
 
===mapAndAccumulateAll===
 
{{Parallel|Map<K, A>[][] mapAndAccumulateAll(E[] input)}}
 
{{Parallel|Map<K, A>[][] mapAndAccumulateAll(E[] input)}}
  
In this stage, you will map and accumulate a given array of data into a matrix of maps. This method should run in parallel while combining the map and accumulate portions of the bottlenecked framework (which we recommend you attempt first). As mentioned previously, the input should be sliced into a mapTaskCount number of slices and then mapped/accumulated into its rightful spot in the matrix. Although you can slice up the data into chunks yourself, we recommend using the <code>Slice</code> and <code>Slices</code> classes introduced earlier in the course.
+
In this stage, you will map and accumulate a given array of data into a matrix of dictionaries. This method should run in parallel while performing the map and accumulate portions of the bottlenecked framework (which we recommend you complete prior to embarking on this mission). As mentioned previously, the input should be sliced into a mapTaskCount number of IndexedRanges and then mapped/accumulated into its appropriate dictionary in the matrix. Although you could slice up the data into chunks yourself, we require using an identical algorithm as performed the <code>IndexedRange</code> and <code>Slices</code> classes introduced earlier in the course.  This will allow us to provide better feedback to allow you to pinpoint bugs sooner.  What is the best way to perform an identical algorithm to your Slices studio?  Use your Slices studio, of course.
  
For each slice, the mapper should map the input into its rightful spot in the matrix and accumulate it into that specific map. Essentially, you will need to nestle the actions of the accumulate method into the mapper. In order to find where the input should go in the matrix, remember that each slice keeps track of its position relative to the other slices and the getReduceIndex method, mentioned above.
+
For each slice, the mapper should map the input into its appropriate cell in the matrix and accumulate it into that specific dictionary. Essentially, you will need to nestle the actions of the accumulate method into the mapper. In order to find where the input should go in the matrix, remember that each slice keeps track of its index id and HashUtils has a toIndex method.  Which is applicable to the row and which is applicable to the column?
  
 
Hint: The number of rows should match the number of slices.
 
Hint: The number of rows should match the number of slices.
  
[https://docs.google.com/presentation/d/1iqJw_bldkVv3AhSCM740FxSTd31-zZBjk71icZ233lk/pub?start=false&loop=false&delayms=3000&slide=id.g343eac61f6_0_319 slide]
+
[[File:Matrix map accumulate all.png|400px]] [https://docs.google.com/presentation/d/1bSLKsI5u2e_tFc0d-RSb0BIDwA-kD75U6yXfxvpMf6Y/pub?start=false&loop=false&delayms=3000&slide=id.g7ebdc248f6_0_347 slide]
[https://docs.google.com/presentation/d/1iqJw_bldkVv3AhSCM740FxSTd31-zZBjk71icZ233lk/pub?start=false&loop=false&delayms=3000&slide=id.g343eac61f6_0_495 slide]
+
 
 +
[[File:Matrix map accumulate art.png|400px]] [https://docs.google.com/presentation/d/1bSLKsI5u2e_tFc0d-RSb0BIDwA-kD75U6yXfxvpMf6Y/pub?start=false&loop=false&delayms=3000&slide=id.g7ebdc248f6_0_388 slide]
  
 
===combineAndFinishAll===
 
===combineAndFinishAll===
Line 191: Line 269:
 
Hint: You can use the provided MultiWrapMap class to return the final row as a valid output. You should also combine before you finish.
 
Hint: You can use the provided MultiWrapMap class to return the final row as a valid output. You should also combine before you finish.
  
[https://docs.google.com/presentation/d/1iqJw_bldkVv3AhSCM740FxSTd31-zZBjk71icZ233lk/pub?start=false&loop=false&delayms=3000&slide=id.g343eac61f6_0_398 slide]
+
[[File:Matrix combine finish all.png|400px]] [https://docs.google.com/presentation/d/1bSLKsI5u2e_tFc0d-RSb0BIDwA-kD75U6yXfxvpMf6Y/pub?start=false&loop=false&delayms=3000&slide=id.g7ebdc248f6_0_425 slide]
[https://docs.google.com/presentation/d/1iqJw_bldkVv3AhSCM740FxSTd31-zZBjk71icZ233lk/pub?start=false&loop=false&delayms=3000&slide=id.g343eac61f6_0_523 slide]
 
  
 
=Testing Your Solution=
 
=Testing Your Solution=
Line 231: Line 308:
 
* Correct mapAndAccumulateAll (30)
 
* Correct mapAndAccumulateAll (30)
 
* Correct combineAndFinishAll (30)
 
* Correct combineAndFinishAll (30)
 +
-->

Latest revision as of 08:23, 3 March 2022