MapReduce Frameworks Lab

From CSE231 Wiki
Jump to navigation Jump to search

Motivation

Dealing with big data is Hansel-level hot right now. We will build two implementations to better understand the inner workings of Hadoop and Spark-like frameworks. Your frameworks will actually be more support more than just MapReduce.

The Matrix implementation gives us experience with dividing the work up into thread confined tasks whose results are then combined together.

Background

wikipedia article on MapReduce

As the name suggests, MapReduce refers to the process of mapping then reducing some stream of data. At its core, all a MapReduce algorithm entails is transforming a list of one kind of item before collecting those items and reducing them down to a single value using some computation. As you can probably tell, the concept of MapReduce is extremely general and can apply to a wide berth of problems. In this assignment, we will use MapReduce to simulate the Facebook mutual friends algorithm (finding the number of mutual friends between two people) and word count algorithm. As studios, you will use MapReduce to find which infected well(s) is causing an outbreak of cholera in historic London and to map a deck of cards.

For more information on the general concept of MapReduce, refer to this article.

Mistakes To Avoid

Attention niels epting.svg Warning: Arrays (and Matrices) are initially filled with null. You must fill them with instances.
Attention niels epting.svg Warning: Ensure that your Slices studio is conforming to spec.
Attention niels epting.svg Warning: Do NOT iterate over all of the original unsliced data from slice.getOriginalUnslicedData(). Use getMinInclusive() and getMaxExclusive() when processing your slice.

Code To Use

You can find all of the relevant files for this assignment under the mapreduce directory. From there, all of the classes you will need to implement can be found under mapreduce.assignment or mapreduce.studio. The core directories are utility and building block classes we created for you and the viz directories are visualization apps that might help you understand your code from a visual standpoint. Take a look at these classes to get a better understanding of how to use them for your assignment.

To allow our frameworks to work well with JDK8 Streams, we employ a couple of standard interfaces over creating out own custom ones.

BiConsumer

We use the standard BiConsumer<T,U> interface with an BiConsumer's accept(t,u) method in the place of a mythical, custom MapContext<K,V> interface with an emit(key,value) method.

public interface BiConsumer<T, U> {
    // invoke accept with each key and value you wish to emit
    void accept(T t, U u);
}

Collector READ THE JAVADOC

We use the standard Collector<T,A,R> interface in place of a mythical, custom AccumulatorReducer<V,A,R> interface.

Note: we strongly encourage you to read the Collector<T,A,R> Javadoc.

To repeat: we strongly encourage you to read the Collector<T,A,R> Javadoc.

public interface Collector<T, A, R> {

	// invoke supplier().get() to create a new mutable container
	Supplier<A> supplier();

	// invoke accumulator().accept(container, item) to add item to a container
	BiConsumer<A, T> accumulator();

	// invoke combiner().apply(containerA, containerB) to combine one container into the other
	BinaryOperator<A> combiner();

	// invoke finisher().apply(container) to reduce a container to its final form
	Function<A, R> finisher();
}

supplier get

We use supplier().get() to create a new mutable container. For classic map reduce this would be a List<V>.

mythical code analogy: container = collector.supplier().get() container = accumulatorReducer.createContainer()

accumulator accept

We use accumulator().accept(container,item) to accumulate a value. For classic map reduce this would add an item to a list.

mythical code analogy: collector.accumulator().accept(container,item); accumulatorReducer.accumulate(container,item)

combiner apply

We use combiner().apply(containerA,containerB) to combine two accumulators. You may combine containerB into containerA or containerA into containerB. Just return whichever is the combined result.

mythical code analogy: collector.combiner().apply(containerA,containerB) accumulatorReducer.combine(containerA,containerB)

finisher apply

We use finisher().apply(container) to reduce an accumulator.

mythical code analogy: collector.finisher().apply(container) r = accumulatorReducer.reduce(container)

A Path To Victory

Watch MapReduce with Playing Cards

Read Finding Mutual Friends

Read java.util.stream.Collector Javadoc

Implement #Cards MapReduce Studio

Implement #WordCountConcreteStaticMapReduce

Implement #MutualFriendsConcreteStaticMapReduce

Implement #WordCountMapper

Implement #IntegerSumClassicReducer

Implement MutualFriendsMapper

Implement MutualFriendsReducer

Implement ClassicReducer

Implement #Simple MapReduce Framework

Implement #Cholera MapReduce Studio

Implement #Matrix MapReduce Framework

Optional Warm Up

WordCountConcreteStaticMapReduce

class: WordCountConcreteStaticMapReduce.java Java.png
methods: map
reduceCreateList
reduceAccumulate
reduceCombine
reduceFinish
mapAll
accumulateAll
finishAll
package: mapreduce.framework.warmup.wordcount
source folder: student/src/main/java


Test Suite: WordCountMapReduceWarmUpTestSuite

Mapper

void map(TextSection textSection, BiConsumer<String, Integer> keyValuePairConsumer)

Reducer

List<Integer> reduceCreateList()

void reduceAccumulate(List<Integer> list, int v)

void reduceCombine(List<Integer> a, List<Integer> b)

int reduceFinish(List<Integer> list)

Framework

List<KeyValuePair<String, Integer>>[] mapAll(TextSection[] input)

Map<String, List<Integer>> accumulateAll(List<KeyValuePair<String, Integer>>[] mapAllResults)

Map<String, Integer> finishAll(Map<String, List<Integer>> accumulateAllResult)

MutualFriendsConcreteStaticMapReduce

class: MutualFriendsConcreteStaticMapReduce.java Java.png
methods: map
reduceCreateList
reduceAccumulate
reduceCombine
reduceFinish
mapAll
accumulateAll
finishAll
package: mapreduce.framework.warmup.friends
source folder: student/src/main/java

Test Suite: MutualFriendsMapReduceWarmUpTestSuite

Mapper

{{Sequential|static void map(Account account, BiConsumer<OrderedPair<AccountId>, Set<AccountId>> keyValuePairConsumer)}

Reducer

method: static List<Set<AccountId>> reduceCreateList() Sequential.svg (sequential implementation only) method: static void reduceAccumulate(List<Set<AccountId>> list, Set<AccountId> v) Sequential.svg (sequential implementation only) method: static void reduceCombine(List<Set<AccountId>> a, List<Set<AccountId>> b) Sequential.svg (sequential implementation only) method: static MutualFriendIds reduceFinish(List<Set<AccountId>> list) Sequential.svg (sequential implementation only)

Framework

method: static List<KeyValuePair<OrderedPair<AccountId>, Set<AccountId>>>[] mapAll(Account[] input) Parallel.svg (parallel implementation required) method: static Map<OrderedPair<AccountId>, List<Set<AccountId>>> accumulateAll(List<KeyValuePair<OrderedPair<AccountId>, Set<AccountId>>>[] mapAllResults) Sequential.svg (sequential implementation only) method: static Map<OrderedPair<AccountId>, MutualFriendIds> finishAll(Map<OrderedPair<AccountId>, List<Set<AccountId>>> accumulateAllResult) Parallel.svg (parallel implementation required)

Required Lab

Simple MapReduce Framework

class: SimpleMapReduceFramework.java Java.png
methods: mapAll
accumulateAll
finishAll
package: mapreduce.framework.lab.simple
source folder: student/src/main/java

Navigate to the SimpleMapReduceFramework.java class and there will be three methods for you to complete: mapAll, accumulateAll, and finishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce.

mapAll

method: List<KeyValuePair<K, V>>[] mapAll(E[] input) Parallel.svg (parallel implementation required)

With this method, you will map all of the elements of an array of data into a new array of equivalent size consisting of a list of key value pairs. In order to do this, you must define the map() method for the mapper by specifying that it should add a new key value pair to a previously empty list. This list should then be added to the array of lists you previously defined, therefore completing the mapping stage of MapReduce. This should all be done in parallel.

Hint: you should create an array of lists equivalent in size to the original array. Each list will contain all of the emitted (key,value) pairs for its item.

accumulateAll

method: Map<K, A> accumulateAll(List<KeyValuePair<K, V>>[] mapAllResults) Sequential.svg (sequential implementation only)

This middle step is often excluded in more advanced MapReduce applications. When run in parallel, it is the only step of the framework that must be completed sequentially. In the matrix framework implementation, we will do away with this step altogether for the sake of performance.

In this method, you will take in the array of lists you previously created and accumulate the key value pairs in the lists into a newly defined map. Unlike the mapping phase, the map must account for the possibility of duplicates. To help deal with this issue, you must make use of the Collector provided to you. More specifically, access the accumulator in the collector by calling the accumulator() method and accept the key/value pair when you add it to the map. You probably noticed that the method must return a map of <K, A>, which differs from the <K, V> generics fed into the method. The framework is designed this way as the data originally fed into the mapping stage can be turned into a different form of data before reaching the reduce stage. Although we will not do this with any of our implementations, we designed the framework to allow this. In order to access the correct value for the map if the key has no associated value yet, use the supplier associated with the Collector with the supplier() method.

Hint: Look into the compute() method for maps.

finishAll

method: Map<K, R> finishAll(Map<K, A> accumulateAllResult) Parallel.svg (parallel implementation required)

This final step reduces the accumulated data and returns the final map in its reduced form. Again, you may notice that the method returns a map of <K, R> instead of the <K, A> which was returned in the accumulateAll method. This happens for the exact same reason as the accumulateAll method, as the framework is designed to handle cases in which the reduced data differs in type from the accumulated data.

To reduce the data down, use the map returned from the accumulateAll stage and put the results of the reduction into a new map. The provided Collector will come in extremely handy for this stage, more specifically the finisher which can be called using the finisher() method. This step should run in parallel and will probably be the easiest of the three methods.

Hint: Use the entrySet() method to get all of the entries in the given map and remember to use a ConcurrentHashMap instead of a regular HashMap to ensure the method can run in parallel.

Matrix MapReduce Framework

class: MatrixMapReduceFramework.java Java.png
methods: mapAndAccumulateAll
combineAndFinishAll
package: mapreduce.framework.lab.matrix
source folder: student/src/main/java

Navigate to the MatrixMapReduceFramework.java class and there will be two methods for you to complete: mapAndAccumulateAll and combineAndFinishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce.

The matrix framework is much more complex than the simple framework, but it boosts performance by grouping the map and accumulate stages so that everything can run in parallel. It does so by slicing up the given data into the specified mapTaskCount number of slices and assigns a reduce task number to each entry using the provided getReduceIndex() method. This, in effect, creates a matrix of maps, hence the name of the framework. In the combineAndFinishAll stage, the matrix comes in handy by allowing us to go directly down the matrix (as each key is essentially grouped into a bucket), combining and reducing elements all-in-one. This concept was explained in more depth during class.

mapAndAccumulateAll

method: Map<K, A>[][] mapAndAccumulateAll(E[] input) Parallel.svg (parallel implementation required)

In this stage, you will map and accumulate a given array of data into a matrix of maps. This method should run in parallel while combining the map and accumulate portions of the simple framework (which we recommend you attempt first). As mentioned previously, the input should be sliced into a mapTaskCount number of slices and then mapped/accumulated into its rightful spot in the matrix. Although you can slice up the data into chunks yourself, we recommend using the Slice and SliceUtils classes introduced earlier in the course.

For each slice, the mapper should map the input into its rightful spot in the matrix and accumulate it into that specific map. Essentially, you will need to nestle the actions of the accumulate method into the mapper. In order to find where the input should go in the matrix, remember that each slice keeps track of its position relative to the other slices and the getReduceIndex method, mentioned above.

Hint: The number of rows should match the number of slices.

combineAndFinishAll

method: Map<K, R> combineAndFinishAll(Map<K, A>[][] input) Parallel.svg (parallel implementation required)

In this stage, you will take the matrix you just completed and combine all of the separate rows down to one array. Afterward, you will convert this combined array of maps into one final map. This method should run in parallel.

As mentioned previously, you should go directly down the matrix to access the same bucket across the different slices you created in the mapAndAccumulateAll step. For all of the maps in a column, you should go through each entry and combine it down into one row. You will need to make use of the Collector’s finisher again, but you will also need to make use of the combiner. You can access the Collector’s combiner using the combiner() method. Although the combine step differs from the simple framework, the finish step should mirror what you did previously.

Hint: You can use the provided MultiWrapMap class to return the final row as a valid output. You should also combine before you finish.

Test Suite: MapReduceAssignmentTestSuite

Associated Studios

Rubric

As always, please make sure to cite your work appropriately.

Total points: 100

Simple framework subtotal: 40

  • Correct mapAll (10)
  • Correct accumulateAll (20)
  • Correct finishAll (10)

Matrix framework subtotal: 50

  • Correct mapAndAccumulateAll (25)
  • Correct combineAndFinishAll (25)

Whole project:

  • Clarity and efficiency (10)