Difference between revisions of "MapReduce Frameworks Lab"

From CSE231 Wiki
Jump to navigation Jump to search
m (Cosgroved moved page MapReduce Assignment to MapReduce)
Line 1: Line 1:
The MapReduce assignment has many different pieces that come together to solve problems.  So much so that in the end writing each individual class or method can be less challenging than simply figuring out what each piece’s role should be in the larger puzzle.
+
=Background=
  
At a high level we have a framework, a mapper, a reducer, and some data we would like to process.
+
As the name suggests, MapReduce refers to the process of mapping then reducing some stream of data. At its core, all a MapReduce algorithm entails is transforming a list of one kind of item before collecting those items and reducing them down to a single value using some computation. As you can probably tell, the concept of MapReduce is extremely general and can apply to a wide berth of problems. In this assignment, we will use MapReduce to simulate the Facebook mutual friends algorithm (finding the number of mutual friends between two people) and word count algorithm. As studios, you will use MapReduce to find which infected well(s) is causing an outbreak of cholera in historic London and to map a deck of cards.
  
Let’s start with the Framework.  The first job of the Framework is mapAll() which unsurprisingly calls map() on the Mapper for every piece of data.  That’s it.  Don’t overcomplicate it.  The framework leaves all of the details of what (key, value) pairs to emit to the Mapper.  Just call map() for every piece of data.  Granted, it does need to create an instance of Context to receive those emissions.  For the SimpleFramework we end up creating a context that holds a single dictionary to handle all of the key, value pairs the Mapper throws at it.  This IndividualContext leaves grouping to the next stage so it can simply associate the keys with the values and be done. For the MatrixFramework, we end up mapping and grouping at once so its GroupStageSkippingContext is a bit more complicated, but the mapAndGroupAll() method on MatrixFramework stays nice and simple.  It call map() on the Mapper for each piece of data.  As stated before, don’t overcomplicate it.  You will want to create an n-way-split-esque separation of the data into tasks, each with its own GroupStageSkippingContext, but don’t lose sight of the goal: call map on the Mapper for each piece of the data.
+
For more information on the general concept of MapReduce, refer to [https://stackoverflow.com/questions/28982/simple-explanation-of-mapreduce this] article.
  
Next up, let’s talk about the Mapper’s role.  The Mapper is application specific.  By that I mean, if we were distributing a system to compete with Hadoop we would build one MatrixFramework as well as we could and ship it.  Users of our system would then write any number of custom Mappers and Reducers to solve their particular problems.
+
=Where to Start=
  
The Mapper is passed two parameters: a context which knows how to receive (key,value) pairs and a piece of data to process. By “process” we mean simply figure out what (key, value) pairs it wants to emit and emit them. We have built a number of Mappers in this course. The WordCountMapper processes a line of text and emits pairs of (key=word, value=1) for each word in the line.  The MutualFriendsMapper emits (key=friendPair, value=account’sFriendList) for each of the passed in account’s friends.  The CardMapper emits pairs of (key=suit, value=numericValue) for each card it is passed. The CholoraMapper is passed a residence location for someone who died in the outbreak and emits a single (key, value) pair: (key=the water pump closest to that location, value=1).
+
You can find all of the relevant files for this assignment under the '''mapreduce''' directory. From there, all of the classes you will need to implement can be found under <code>mapreduce.assignment</code> or <code>mapreduce.studio</code>. The '''core''' directories are utility and building block classes we created for you and the '''viz''' directories are visualization apps that might help you understand your code from a visual standpoint. Take a look at these classes to get a better understanding of how to use them for your assignment.
  
As far as the Context is concerned, its role is to handle whatever key,value pairs the Mapper emits at it.  For the SimpleFramework we assume that no context will be emitted the same key twice so we can simply associate the key with its value in the contained dictionary.  For the MatrixFramework it is a bit more complicated.  First we have to group the emissions as they arrive (as there can be duplicates) and second we need to separate them into the correct column based on their key’s hashCode.
+
==Cards MapReduce Studio==
  
As far as grouping goes, hopefully writing groupAll() in the SimpleFramework will set you up well for the grouping as you receive responsibilities of GroupStageSkippingMapContext. At a high level, looking at the signature of groupAll() tells you what you need to do. You are passed an array of IndividualMapContexts, each containing a dictionary of key, value pairs. You must return a single Dictionary that contains all of the keys you were passed in, each associated with a list of all the values you encounter. The array of Contexts might obscure it a little, but at a high level one can think of it as: groupAll() is passed something akin to  List<Map<K,V>> and must return a single Map<K,List<V>>.
+
Navigate to the <code>CardMapper.java</code> class and look at the map method. In this part of the studio, you are going to create a mapper for a given deck of cards. The mapper should accept a card’s suit and numeric value through the use of a BiConsumer. Iterate through all of the cards in the given deck and ensure they are valid cards. If they are, then map them!
  
=Part 1 Simple Framework=
+
Hint: the <code>Rank</code> class has a useful method that checks whether a card is valid.
==[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/student/solution/mapreduce/simple/StudentIndividualMapContext.html class StudentIndividualMapContext]==
 
public void emit(K key, V value)
 
  
==Word Count Application==
+
==Cholera MapReduce Studio==
===[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/student/solution/mapreduce/apps/wordcount/WordCountMapper.html class WordCountMapper]===
 
public void map(MapContext<String, Integer> context, String[] lineOfWords)
 
  
===[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/student/solution/mapreduce/apps/wordcount/WordCountReducer.html class WordCountReducer]===
+
Navigate to the <code>CholeraMapper.java</code> class and look at the map method. In this part of the studio, you will attempt to find which water pumps are infected with cholera based on a given location of a reported case. You will do this by checking which specific WaterPump is closest to a given location and map that value to a BiConsumer. The BiConsumer will accept two arguments: the closest WaterPump and the number of occurrences to add onto the map. To find the closest WaterPump, you should go through a database of all available water pumps and compare the distances between the pumps and the location.
public FinishAccumulator<Integer> createAccumulator()
 
  
===Test===
+
Hint: there are some useful methods in the WaterPump and Location classes that might help you with the studio.
class TestWordCountApplicationWithInstructorFramework
 
  
==Mutual Friends Application==
+
==Mutual Friends MapReduce Assignment==
===[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/student/solution/mapreduce/apps/friends/MutualFriendsMapper.html class MutualFriendsMapper]===
 
public void map(MapContext<OrderedPair<AccountId>, List<AccountId>> context, Account account)
 
  
===[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/student/solution/mapreduce/apps/friends/MutualFriendsReducer.html class MutualFriendsReducer]===
+
The goal of this implementation is to create Facebook’s mutual friends algorithm using MapReduce. To accomplish this, you will need to create the mapper yourself, but the reducer has been provided to you. Navigate to the <code>MutualFriendsMapper.java</code> class. In this class, you will specifically define how the framework accomplishes the map method.
public FinishAccumulator<List<AccountId>> createAccumulator()
 
  
see: [[Habanero#FinishAccumulator_Creation|creating a custom reduction accumulator]] and [http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/edu/wustl/cse231s/mapreduce/apps/friends/ListIntersectionReducer.html ListIntersectionReducer<T>]
+
The only method you will need to alter is the map method. In this method, you will need to map every combination of the account holder to his/her friends. In order to do this, create ordered pairs of the given account’s ID and the IDs of the account holder’s friends. You must then feed each individual ordered pair into the keyValuePairConsumer along with the full set of the account holder’s friends.
  
===Test===
+
Hint: check out the methods in the Account class for help.
class TestMutualFriendsApplicationWithInstructorFramework
 
  
==class WordCountConcreteStaticMapReduce (optional but encouraged)==
+
==Word Count MapReduce Assignment==
===mapAll===
 
private static IndividualMapContext<String, Integer>[] mapAll(String[][] S, WordCountMapper mapper) throws SuspendableException
 
===groupAll===
 
private static Map<String, List<Integer>> groupAll(IndividualMapContext<String, Integer>[] f_of_S)
 
===reduceAll===
 
private static Map<String, Integer> reduceAll(Map<String, List<Integer>> grouped_f_of_S, WordCountReducer reducer) throws SuspendableException
 
===Test===
 
class TestWordCountConcreteStaticFramework
 
  
==class MutualFriendsConcreteStaticMapReduce (optional but encouraged)==
+
The goal of this implementation is to count the number of times a word appears in a given text, using MapReduce. To accomplish this, you will need to create both the mapper and the reducer. Navigate to the <code>WordCountMapper.java</code> and <code>IntegerSumClassicReducer.java</code> classes. You will specifically define how the framework accomplishes the map and reduce methods.
===mapAll===
 
private static IndividualMapContext<OrderedPair<AccountId>, List<AccountId>>[] mapAll(Account[] S, MutualFriendsMapper mapper) throws SuspendableException
 
  
[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/hw4/mapAllIndividual.svg diagram]
+
===WordCountMapper===
  
===groupAll===
+
The only method you will need to alter is the map method. In this method, you need to record every instance of a given word and feed it into the keyValuePairConsumer. To do this, access all of the words in the TextSection and if the length of the word is greater than zero (meaning it is not just blank space), convert it into lower-case and accept it into the consumer.
private static Map<OrderedPair<AccountId>, List<List<AccountId>>> groupAll(IndividualMapContext<OrderedPair<AccountId>, List<AccountId>>[] f_of_S)
 
===reduceAll===
 
private static Map<OrderedPair<AccountId>, List<AccountId>> reduceAll(Map<OrderedPair<AccountId>, List<List<AccountId>>> grouped_f_of_S, MutualFriendsReducer reducer) throws SuspendableException
 
===Test===
 
class TestMutualFriendsConcreteStaticFramework
 
  
==[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/student/solution/mapreduce/simple/SimpleMapReduceFramework.html SimpleMapReduceFramework<E,K,V>]==
+
Hint: Look at the methods in TextSection and the toLowerCase() method for strings for assistance.
===mapAll===
 
private IndividualMapContext<K,V>[] mapAll(E[] S, Mapper<E,K,V> mapper) throws SuspendableException
 
===groupAll===
 
private Map<K, List<V>> groupAll(IndividualMapContext<K,V>[] f_of_S)
 
===reduceAll===
 
private Map<K, V> reduceAll(Map<K, List<V>> grouped_f_of_S, Reducer<V> reducer) throws SuspendableException
 
===Test===
 
class TestMutualFriendsSolution
 
  
==[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/edu/wustl/cse231s/mapreduce/FinishAccumulatorBasedReducer.html FinishAccumulatorBasedReducer]==
+
===IntegerSumClassicReducer===
===reduce===
 
default V reduce( List<V> list ) throws SuspendableException
 
  
=Part 2 Matrix Framework=
+
The only method you will need to alter is the apply method. All you need to do is sum up the value of a list of integers and return that sum value.
==[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/student/solution/mapreduce/matrix/StudentGroupStageSkippingMapContext.html class StudentGroupStageSkippingMapContext<K,V>]==
 
===emit===
 
public void emit(K key, V value)
 
  
==[http://www.cse.wustl.edu/~cosgroved/courses/cse231/s17/javadocs/student/solution/mapreduce/matrix/MatrixMapReduceFramework.html class MatrixMapReduceFramework<E,K,V>]==
+
==Simple MapReduce Framework==
===mapGroupAll===
+
 
private GroupStageSkippingMapContext<K, V>[] mapGroupAll(E[] S, Mapper<E, K, V> mapper) throws SuspendableException
+
Navigate to the <code>SimpleMapReduceFramework.java</code> class and there will be three methods for you to complete: mapAll, accumulateAll, and finishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce.
===reduceAll===
+
 
private Map<K,V> reduceAll(GroupStageSkippingMapContext<K, V>[] grouped_f_of_S, Reducer<V> reducer) throws SuspendableException
+
===mapAll Method===
 +
 
 +
With this method, you will map all of the elements of an array of data into a new array of equivalent size consisting of a list of key value pairs. In order to do this, you must define the map() method for the mapper by specifying that it should add a new key value pair to a previously empty list. This list should then be added to the array of lists you previously defined, therefore completing the mapping stage of MapReduce. This should all be done in parallel.
 +
 
 +
Hint: if you are creating an array of lists equivalent in size to the original array, your lists should probably consist of just one item.
 +
 
 +
===accumulateAll Method===
 +
 
 +
This middle step is often excluded in more advanced MapReduce applications. When run in parallel, it is the only step of the framework that must be completed sequentially. In the matrix framework implementation, we will do away with this step altogether for the sake of performance.
 +
 
 +
In this method, you will take in the array of lists you previously created and accumulate the key value pairs in the lists into a newly defined map. Unlike the mapping phase, the map must account for the possibility of duplicates. To help deal with this issue, you must make use of the Collector provided to you. More specifically, access the accumulator in the collector by calling the <code>accumulator()</code> method and accept the key/value pair when you add it to the map. You probably noticed that the method must return a map of <K, A>, which differs from the <K, V> generics fed into the method. The framework is designed this way as the data originally fed into the mapping stage can be turned into a different form of data before reaching the reduce stage. Although we will not do this with any of our implementations, we designed the framework to allow this. In order to access the correct value for the map if the key has no associated value yet, use the supplier associated with the Collector with the <code>supplier()</code> method.
 +
 
 +
Hint: Look into the <code>compute()</code> method for maps.
 +
 
 +
===finishAll Method===
 +
 
 +
This final step reduces the accumulated data and returns the final map in its reduced form. Again, you may notice that the method returns a map of <K, R> instead of the <K, A> which was returned in the accumulateAll method. This happens for the exact same reason as the accumulateAll method, as the framework is designed to handle cases in which the reduced data differs in type from the accumulated data.
 +
 
 +
To reduce the data down, use the map returned from the accumulateAll stage and put the results of the reduction into a new map. The provided Collector will come in extremely handy for this stage, more specifically the finisher which can be called using the <code>finisher()</code> method. This step should run in parallel and will probably be the easiest of the three methods.
 +
 
 +
Hint: Use the <code>entrySet()</code> method to get all of the entries in the given map and remember to use a ConcurrentHashMap instead of a regular HashMap to ensure the method can run in parallel.
 +
 
 +
==Matrix MapReduce Framework==
 +
 
 +
Navigate to the <code>MatrixMapReduceFramework.java</code> class and there will be two methods for you to complete: mapAndAccumulateAll and combineAndFinishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce.
 +
 
 +
The matrix framework is much more complex than the simple framework, but it boosts performance by grouping the map and accumulate stages so that everything can run in parallel. It does so by slicing up the given data into the specified mapTaskCount number of slices and assigns a reduce task number to each entry using the provided getReduceIndex() method. This, in effect, creates a matrix of maps, hence the name of the framework. In the combineAndFinishAll stage, the matrix comes in handy by allowing us to go directly down the matrix (as each key is essentially grouped into a bucket), combining and reducing elements all-in-one. This concept was explained in more depth during class.
 +
 
 +
===mapAndAccumulateAll===
 +
 
 +
In this stage, you will map and accumulate a given array of data into a matrix of maps. This method should run in parallel while combining the map and accumulate portions of the simple framework (which we recommend you attempt first). As mentioned previously, the input should be sliced into a mapTaskCount number of slices and then mapped/accumulated into its rightful spot in the matrix. Although you can slice up the data into chunks yourself, we recommend using the <code>Slice</code> and <code>Slices</code> classes introduced earlier in the course.
 +
 
 +
For each slice, define that the mapper should map the input into its rightful spot in the matrix and accumulate it into that specific map. Essentially, you will need to nestle the actions of the accumulate method into the mapper. In order to find where the input should go in the matrix, remember that each slice keeps track of its position relative to the other slices and the getReduceIndex method, mentioned above.
 +
 
 +
Hint: The number of rows should match the number of slices.
 +
 
 +
===combineAndFinishAll===
 +
 
 +
In this stage, you will take the matrix you just completed and combine all of the separate rows down to one array. Afterwards, you will convert this combined array of maps into one final map. This method should run in parallel.
 +
 
 +
As mentioned previously, you should go directly down the matrix to access the same bucket across the different slices you created in the mapAndAccumulateAll step. For all of the maps in a column, you should go through each entry and combine it down into one row. You will need to make use of the Collector’s finisher again, but you will also need to make use of the combiner. You can access the Collector’s combiner using the <code>combiner()</code> method. Although the combine step differs from the simple framework, the finish step should mirror what you did previously.
 +
 
 +
Hint: You can use the provided MultiWrapMap class to return the final row as a valid output.

Revision as of 21:27, 7 August 2017

Background

As the name suggests, MapReduce refers to the process of mapping then reducing some stream of data. At its core, all a MapReduce algorithm entails is transforming a list of one kind of item before collecting those items and reducing them down to a single value using some computation. As you can probably tell, the concept of MapReduce is extremely general and can apply to a wide berth of problems. In this assignment, we will use MapReduce to simulate the Facebook mutual friends algorithm (finding the number of mutual friends between two people) and word count algorithm. As studios, you will use MapReduce to find which infected well(s) is causing an outbreak of cholera in historic London and to map a deck of cards.

For more information on the general concept of MapReduce, refer to this article.

Where to Start

You can find all of the relevant files for this assignment under the mapreduce directory. From there, all of the classes you will need to implement can be found under mapreduce.assignment or mapreduce.studio. The core directories are utility and building block classes we created for you and the viz directories are visualization apps that might help you understand your code from a visual standpoint. Take a look at these classes to get a better understanding of how to use them for your assignment.

Cards MapReduce Studio

Navigate to the CardMapper.java class and look at the map method. In this part of the studio, you are going to create a mapper for a given deck of cards. The mapper should accept a card’s suit and numeric value through the use of a BiConsumer. Iterate through all of the cards in the given deck and ensure they are valid cards. If they are, then map them!

Hint: the Rank class has a useful method that checks whether a card is valid.

Cholera MapReduce Studio

Navigate to the CholeraMapper.java class and look at the map method. In this part of the studio, you will attempt to find which water pumps are infected with cholera based on a given location of a reported case. You will do this by checking which specific WaterPump is closest to a given location and map that value to a BiConsumer. The BiConsumer will accept two arguments: the closest WaterPump and the number of occurrences to add onto the map. To find the closest WaterPump, you should go through a database of all available water pumps and compare the distances between the pumps and the location.

Hint: there are some useful methods in the WaterPump and Location classes that might help you with the studio.

Mutual Friends MapReduce Assignment

The goal of this implementation is to create Facebook’s mutual friends algorithm using MapReduce. To accomplish this, you will need to create the mapper yourself, but the reducer has been provided to you. Navigate to the MutualFriendsMapper.java class. In this class, you will specifically define how the framework accomplishes the map method.

The only method you will need to alter is the map method. In this method, you will need to map every combination of the account holder to his/her friends. In order to do this, create ordered pairs of the given account’s ID and the IDs of the account holder’s friends. You must then feed each individual ordered pair into the keyValuePairConsumer along with the full set of the account holder’s friends.

Hint: check out the methods in the Account class for help.

Word Count MapReduce Assignment

The goal of this implementation is to count the number of times a word appears in a given text, using MapReduce. To accomplish this, you will need to create both the mapper and the reducer. Navigate to the WordCountMapper.java and IntegerSumClassicReducer.java classes. You will specifically define how the framework accomplishes the map and reduce methods.

WordCountMapper

The only method you will need to alter is the map method. In this method, you need to record every instance of a given word and feed it into the keyValuePairConsumer. To do this, access all of the words in the TextSection and if the length of the word is greater than zero (meaning it is not just blank space), convert it into lower-case and accept it into the consumer.

Hint: Look at the methods in TextSection and the toLowerCase() method for strings for assistance.

IntegerSumClassicReducer

The only method you will need to alter is the apply method. All you need to do is sum up the value of a list of integers and return that sum value.

Simple MapReduce Framework

Navigate to the SimpleMapReduceFramework.java class and there will be three methods for you to complete: mapAll, accumulateAll, and finishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce.

mapAll Method

With this method, you will map all of the elements of an array of data into a new array of equivalent size consisting of a list of key value pairs. In order to do this, you must define the map() method for the mapper by specifying that it should add a new key value pair to a previously empty list. This list should then be added to the array of lists you previously defined, therefore completing the mapping stage of MapReduce. This should all be done in parallel.

Hint: if you are creating an array of lists equivalent in size to the original array, your lists should probably consist of just one item.

accumulateAll Method

This middle step is often excluded in more advanced MapReduce applications. When run in parallel, it is the only step of the framework that must be completed sequentially. In the matrix framework implementation, we will do away with this step altogether for the sake of performance.

In this method, you will take in the array of lists you previously created and accumulate the key value pairs in the lists into a newly defined map. Unlike the mapping phase, the map must account for the possibility of duplicates. To help deal with this issue, you must make use of the Collector provided to you. More specifically, access the accumulator in the collector by calling the accumulator() method and accept the key/value pair when you add it to the map. You probably noticed that the method must return a map of <K, A>, which differs from the <K, V> generics fed into the method. The framework is designed this way as the data originally fed into the mapping stage can be turned into a different form of data before reaching the reduce stage. Although we will not do this with any of our implementations, we designed the framework to allow this. In order to access the correct value for the map if the key has no associated value yet, use the supplier associated with the Collector with the supplier() method.

Hint: Look into the compute() method for maps.

finishAll Method

This final step reduces the accumulated data and returns the final map in its reduced form. Again, you may notice that the method returns a map of <K, R> instead of the <K, A> which was returned in the accumulateAll method. This happens for the exact same reason as the accumulateAll method, as the framework is designed to handle cases in which the reduced data differs in type from the accumulated data.

To reduce the data down, use the map returned from the accumulateAll stage and put the results of the reduction into a new map. The provided Collector will come in extremely handy for this stage, more specifically the finisher which can be called using the finisher() method. This step should run in parallel and will probably be the easiest of the three methods.

Hint: Use the entrySet() method to get all of the entries in the given map and remember to use a ConcurrentHashMap instead of a regular HashMap to ensure the method can run in parallel.

Matrix MapReduce Framework

Navigate to the MatrixMapReduceFramework.java class and there will be two methods for you to complete: mapAndAccumulateAll and combineAndFinishAll. These frameworks are meant to be extremely general and applied to more specific uses of MapReduce.

The matrix framework is much more complex than the simple framework, but it boosts performance by grouping the map and accumulate stages so that everything can run in parallel. It does so by slicing up the given data into the specified mapTaskCount number of slices and assigns a reduce task number to each entry using the provided getReduceIndex() method. This, in effect, creates a matrix of maps, hence the name of the framework. In the combineAndFinishAll stage, the matrix comes in handy by allowing us to go directly down the matrix (as each key is essentially grouped into a bucket), combining and reducing elements all-in-one. This concept was explained in more depth during class.

mapAndAccumulateAll

In this stage, you will map and accumulate a given array of data into a matrix of maps. This method should run in parallel while combining the map and accumulate portions of the simple framework (which we recommend you attempt first). As mentioned previously, the input should be sliced into a mapTaskCount number of slices and then mapped/accumulated into its rightful spot in the matrix. Although you can slice up the data into chunks yourself, we recommend using the Slice and Slices classes introduced earlier in the course.

For each slice, define that the mapper should map the input into its rightful spot in the matrix and accumulate it into that specific map. Essentially, you will need to nestle the actions of the accumulate method into the mapper. In order to find where the input should go in the matrix, remember that each slice keeps track of its position relative to the other slices and the getReduceIndex method, mentioned above.

Hint: The number of rows should match the number of slices.

combineAndFinishAll

In this stage, you will take the matrix you just completed and combine all of the separate rows down to one array. Afterwards, you will convert this combined array of maps into one final map. This method should run in parallel.

As mentioned previously, you should go directly down the matrix to access the same bucket across the different slices you created in the mapAndAccumulateAll step. For all of the maps in a column, you should go through each entry and combine it down into one row. You will need to make use of the Collector’s finisher again, but you will also need to make use of the combiner. You can access the Collector’s combiner using the combiner() method. Although the combine step differs from the simple framework, the finish step should mirror what you did previously.

Hint: You can use the provided MultiWrapMap class to return the final row as a valid output.