Matrix MapReduce Framework Assignment
credit for this assignment: Finn Voichick and Dennis Cosgrove
Contents
Motivation
Dealing with big data is Hansel-level hot right now. We will build two implementations to better understand the inner workings of Hadoop and Spark-like frameworks. Your frameworks will actually be more general than just MapReduce.
The Matrix implementation gives us experience with dividing the work up into thread confined tasks whose results are then combined together.
Background
wikipedia article on MapReduce
As the name suggests, MapReduce refers to the process of mapping then reducing some stream of data. At its core, all a MapReduce algorithm entails is transforming a list of one kind of item before collecting those items and reducing them down to a single values per key using some computation. As you can probably tell, the concept of MapReduce is extremely general and can apply to a wide berth of problems. In this assignment, we will use MapReduce to simulate the Facebook mutual friends algorithm (finding the mutual friends between all friend pairs) as well as pinpointing the offending well in the 1854 Soho Cholera Outbreak.
For more information on the general concept of MapReduce, refer to this article.
Java Advice
Parameterized Type Array Tip
We need to create an array of Maps in this assignment, but creating arrays of parameterized types in Java is madness inducing. Some details are available in Java Generics Restrictions.
The example below creates an array of List<String>. Creating Map<K, V> follows a similar syntax. The @SuppressWarnings
annotation is optional.
@SuppressWarnings("unchecked") List<String>[] result = new List[length];
Use map.entrySet()
Prefer the use of map.entrySet() over map.keySet() followed by looking up the value with map.get(key).
Use the appropriate version of join_fork_loop
there are many overloaded versions of join_fork_loop including:
choose the correct one for each situation where "correct" is often the one that produces the cleanest code.
Mistakes To Avoid
Warning: Arrays (and Matrices) are initially filled with null. You must fill them with instances before you act on individual elements. |
Warning: Ensure that your Slices studio is conforming to spec. |
Code To Use
To allow our frameworks to work well with JDK8 Streams, we employ a couple of standard interfaces over creating our own custom ones.
BiConsumer
We use the standard BiConsumer<T,U> interface with an BiConsumer's accept(t,u) method in the place of a mythical, custom MapContext<K,V> interface with an emit(key,value) method.
public interface BiConsumer<T, U> { // invoke accept with each key and value you wish to emit void accept(T t, U u); }
Reducer<V,A,R>
check out the [Background]
Range
Ranges
MultiWrapMap
class MultiWrapMap<K,V> extends AbstractMap<K, V>
- constructor MultiWrapMap(maps, hashFunction)
A Path To Victory
- Int Sum MR App Exercise
- Reducer Exercise
- Mutual Friends MR App Exercise
- Card Only Sequential Map Reduce Framework Warm Up
- Word Count Only Parallel MapReduce Framework Warm Up
- Bottlenecked MapReduce Framework Exercise
- This Exercise
HashUtils
class: | HashUtils.java | |
methods: | toIndex | |
package: | hash.exercise | |
source folder: | student/src/main/java |
toIndex(key,hashFunction,maxExclusive)
Matrix MapReduce Framework
class: | MatrixMapReduceFramework.java | |
methods: | mapAndAccumulateAll combineAndFinishAll |
|
package: | mapreduce.framework.lab.matrix | |
source folder: | student/src/main/java |
Navigate to the MatrixMapReduceFramework.java
class and there will be two methods for you to complete: mapAndAccumulateAll and combineAndFinishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce.
The matrix framework is much more complex than the bottlenecked framework, but it boosts performance by grouping the map and accumulate stages so that everything can run in parallel. It does so by slicing up the given data into the specified mapTaskCount number of slices and assigns a reduce task number to each entry using the HashUtils toIndex() method. This, in effect, creates a matrix of dictionaries, hence the name of the framework. In the combineAndFinishAll stage, the matrix comes in handy by allowing us to go directly down the columns of the matrix (as each key is essentially grouped into a bucket), combining and reducing elements all-in-one. This concept was explained in more depth during class.
mapAndAccumulateAll
method: Map<K, A>[][] mapAndAccumulateAll(E[] input)
(parallel implementation required)
In this stage, you will map and accumulate a given array of data into a matrix of dictionaries. This method should run in parallel while performing the map and accumulate portions of the bottlenecked framework (which we recommend you complete prior to embarking on this mission). As mentioned previously, the input should be sliced into a mapTaskCount number of IndexedRanges and then mapped/accumulated into its appropriate dictionary in the matrix. Although you could slice up the data into chunks yourself, we require using an identical algorithm as performed the Range
and Ranges
classes introduced earlier in the course. This will allow us to provide better feedback to allow you to pinpoint bugs sooner. What is the best way to perform an identical algorithm to your Ranges exercise? Use your Ranges exercise, of course.
For each slice, the mapper should map the input into its appropriate cell in the matrix and accumulate it into that specific dictionary. Essentially, you will need to nestle the actions of the accumulate method into the mapper. In order to find where the input should go in the matrix, remember that you can either pass Map[].class
to join_fork_loop
or use join_void_fork_loop_with_index
and HashUtils has a toIndex method. Which is applicable to the row and which is applicable to the column?
Hint: The number of rows should match the number of Ranges.
Spring 2022 Note: if you wish to use join_void_fork_loop_with_index
you will need to add the static import to the top of the file:
import static fj.FJ.join_void_fork_loop_with_index;
Warning:Do NOT copy over the Bottlenecked approach. Perform accumulation live |
combineAndFinishAll
method: Map<K, R> combineAndFinishAll(Map<K, A>[][] input)
(parallel implementation required)
In this stage, you will take the matrix you just completed and combine all of the separate rows down to one array. Afterward, you will convert this combined array of maps into one final map. This method should run in parallel.
As mentioned previously, you should go directly down the matrix to access the same bucket across the different slices you created in the mapAndAccumulateAll step. For all of the maps in a column, you should go through each entry and combine it down into one row. You will need to make use of the Reducer’s reduce() method again, but you will also need to make use of the combine() method. Although a combine step was unnecessary for the bottlenecked framework, the reduce() step should mirror what you did previously.
Hint: You can use the provided MultiWrapMap class to collect the output and return it as a valid output. (MultiWrapMap implements Map.)
Testing Your Solution
Correctness
class: | _MatrixFrameworkTestSuite.java | |
package: | mapreduce | |
source folder: | testing/src/test/java |
MapAccumulateAll
class: | MatrixMapAccumulateAllTestSuite.java | |
package: | mapreduce.framework.matrix.exercise | |
source folder: | testing/src/test/java |
CombineFinishAll
class: | MatrixCombineFinishAllTestSuite.java | |
package: | mapreduce.framework.matrix.exercise | |
source folder: | testing/src/test/java |
Holistic
class: | MatrixHolisticTestSuite.java | |
package: | mapreduce.framework.matrix.exercise | |
source folder: | testing/src/test/java |