MapReduce Frameworks Lab

From CSE231 Wiki
Revision as of 07:05, 10 March 2017 by Cosgroved (talk | contribs)
Jump to navigation Jump to search

The MapReduce assignment has many different pieces that come together to solve problems. So much so that in the end writing each individual class or method can be less challenging than simply figuring out what each piece’s role should be in the larger puzzle.

At a high level we have a framework, a mapper, a reducer, and some data we would like to process.

Let’s start with the Framework. The first job of the Framework is mapAll() which unsurprisingly calls map() on the Mapper for every piece of data. That’s it. Don’t overcomplicate it. The framework leaves all of the details of what (key, value) pairs to emit to the Mapper. Just call map() for every piece of data. Granted, it does need to create an instance of Context to receive those emissions. For the SimpleFramework we end up creating a context that holds a single dictionary to handle all of the key, value pairs the Mapper throws at it. This IndividualContext leaves grouping to the next stage so it can simply associate the keys with the values and be done. For the MatrixFramework, we end up mapping and grouping at once so its GroupStageSkippingContext is a bit more complicated, but the mapAndGroupAll() method on MatrixFramework stays nice and simple. It call map() on the Mapper for each piece of data. As stated before, don’t overcomplicate it. You will want to create an n-way-split-esque separation of the data into tasks, each with its own GroupStageSkippingContext, but don’t lose sight of the goal: call map on the Mapper for each piece of the data.

Next up, let’s talk about the Mapper’s role. The Mapper is application specific. By that I mean, if we were distributing a system to compete with Hadoop we would build one MatrixFramework as well as we could and ship it. Users of our system would then write any number of custom Mappers and Reducers to solve their particular problems.

The Mapper is passed two parameters: a context which knows how to receive (key,value) pairs and a piece of data to process. By “process” we mean simply figure out what (key, value) pairs it wants to emit and emit them. We have built a number of Mappers in this course. The WordCountMapper processes a line of text and emits pairs of (key=word, value=1) for each word in the line. The MutualFriendsMapper emits (key=friendPair, value=account’sFriendList) for each of the passed in account’s friends. The CardMapper emits pairs of (key=suit, value=numericValue) for each card it is passed. The CholoraMapper is passed a residence location for someone who died in the outbreak and emits a single (key, value) pair: (key=the water pump closest to that location, value=1).

As far as the Context is concerned, its role is to handle whatever key,value pairs the Mapper emits at it. For the SimpleFramework we assume that no context will be emitted the same key twice so we can simply associate the key with its value in the contained dictionary. For the MatrixFramework it is a bit more complicated. First we have to group the emissions as they arrive (as there can be duplicates) and second we need to separate them into the correct column based on their key’s hashCode.

As far as grouping goes, hopefully writing groupAll() in the SimpleFramework will set you up well for the grouping as you receive responsibilities of GroupStageSkippingMapContext. At a high level, looking at the signature of groupAll() tells you what you need to do. You are passed an array of IndividualMapContexts, each containing a dictionary of key, value pairs. You must return a single Dictionary that contains all of the keys you were passed in, each associated with a list of all the values you encounter. The array of Contexts might obscure it a little, but at a high level one can think of it as: groupAll() is passed something akin to List<Map<K,V>> and must return a single Map<K,List<V>>.

Part 1 Simple Framework

class StudentIndividualMapContext

public void emit(K key, V value)

Word Count Application

class WordCountMapper

public void map(MapContext<String, Integer> context, String[] lineOfWords)

class WordCountReducer

public FinishAccumulator<Integer> createAccumulator()

Test

class TestWordCountApplicationWithInstructorFramework

Mutual Friends Application

class MutualFriendsMapper

public void map(MapContext<OrderedPair<AccountId>, List<AccountId>> context, Account account)

class MutualFriendsReducer

public FinishAccumulator<List<AccountId>> createAccumulator()

see: creating a custom reduction accumulator and ListIntersectionReducer<T>

Test

class TestMutualFriendsApplicationWithInstructorFramework

class WordCountConcreteStaticMapReduce (optional but encouraged)

mapAll

private static IndividualMapContext<String, Integer>[] mapAll(String[][] S, WordCountMapper mapper) throws SuspendableException

groupAll

private static Map<String, List<Integer>> groupAll(IndividualMapContext<String, Integer>[] f_of_S)

reduceAll

private static Map<String, Integer> reduceAll(Map<String, List<Integer>> grouped_f_of_S, WordCountReducer reducer) throws SuspendableException

Test

class TestWordCountConcreteStaticFramework

class MutualFriendsConcreteStaticMapReduce (optional but encouraged)

mapAll

private static IndividualMapContext<OrderedPair<AccountId>, List<AccountId>>[] mapAll(Account[] S, MutualFriendsMapper mapper) throws SuspendableException

diagram

groupAll

private static Map<OrderedPair<AccountId>, List<List<AccountId>>> groupAll(IndividualMapContext<OrderedPair<AccountId>, List<AccountId>>[] f_of_S)

reduceAll

private static Map<OrderedPair<AccountId>, List<AccountId>> reduceAll(Map<OrderedPair<AccountId>, List<List<AccountId>>> grouped_f_of_S, MutualFriendsReducer reducer) throws SuspendableException

Test

class TestMutualFriendsConcreteStaticFramework

SimpleMapReduceFramework<E,K,V>

mapAll

private IndividualMapContext<K,V>[] mapAll(E[] S, Mapper<E,K,V> mapper) throws SuspendableException

groupAll

private Map<K, List<V>> groupAll(IndividualMapContext<K,V>[] f_of_S)

reduceAll

private Map<K, V> reduceAll(Map<K, List<V>> grouped_f_of_S, Reducer<V> reducer) throws SuspendableException

Test

class TestMutualFriendsSolution

FinishAccumulatorBasedReducer

reduce

default V reduce( List<V> list ) throws SuspendableException

Part 2 Matrix Framework

class StudentGroupStageSkippingMapContext<K,V>

emit

public void emit(K key, V value)

class MatrixMapReduceFramework<E,K,V>

mapGroupAll

private GroupStageSkippingMapContext<K, V>[] mapGroupAll(E[] S, Mapper<E, K, V> mapper) throws SuspendableException

reduceAll

private Map<K,V> reduceAll(GroupStageSkippingMapContext<K, V>[] grouped_f_of_S, Reducer<V> reducer) throws SuspendableException