Welcome to the fifth article in the series of Apache Spark tutorials. In this article, you will learn the application of
In the previous articles, we have used the
flatMap
transform operation. After the introduction to flatMap
operation, a sample Spark application is developed to list all action movies from the MovieLens dataset.In the previous articles, we have used the
map
transform operation which transforms an entity into another entity where the transformation is one-to-one. For example, suppose you have a String RDD named lines, applying lines.map(x => x.toUpperCase)
operation creates a new String RDD with the same number of records but with uppercase string literals as shown below:On the other hand,
flatMap
transform operation is a one-to-many. For example, if we call lines.flatMap(x => x.split(' ')
, flatMap
will create a new RDD with 6 records as shown below because it splits records with space into separate words.flatMap
can be used for one-to-one as well as one-to-zero mapping as well. For example, lines.flatMap(x => None)
will return an empty RDD because flatMap
does not create a record in the resulting RDD for None values.Sometimes we may need to split a Key-Value RDD. There is a convenient method
flatMapValues
which can be used to split a Key-Value pair into multiple Key-Value pairs. For example, calling flatMapValues(x => x.split(' '))
in the diagram shown below creates 6 records in the resulting RDD. Notice that split values in the resultant RDD have the same key if they were from the same Key-Value pair.With this knowledge, we can jump into the example which uses the
flatMapValues
transform operation to list action movies from the MovieLens dataset. As we did in the previous articles, we are going to use the same MovieLens dataset we used in the first article: Spark 01: Movie Rating Counter. If you don't have the dataset, please follow the first article and download the dataset.The MovieLens dataset has a movies.csv file which contains genres of each movie. The following table shows a sample dataset with rows extracted from the movies.csv file.
movieId | title | genres |
---|---|---|
6 | Heat (1995) | Action|Crime|Thriller |
7 | Sabrina (1995) | Comedy|Romance |
8 | Tom and Huck (1995) | Adventure|Children |
9 | Sudden Death (1995) | Action |
10 | GoldenEye (1995) | Action|Adventure|Thriller |
|
) character. In the following example, we are going to list all action movies on the console.Step 1:
Create a new Scala → sbt project in IntelliJ IDEA with a name ActionMovies and add the following dependencies to the build.sbt file.
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
Please check the first article on this series: Spark 01: Movie Rating Counter, for detailed instructions.
Step 2:
Create a new package
com.javahelps.spark
and create a new Scala object named ActionMovies.Step 3:
Create a main function and add the code to read the movies.csv file as shown below:
package com.javahelps.spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
object ActionMovies {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "ActionMovies")
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/movies.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
}
}
Please note that so far we haven't done anything new compared to the first program: Spark 01: Movie Rating Counter.
Step 4:
Let's remove the unnecessary
Step 5:
It's time to use the
Step 6:
This step is quite straight forward. After the last step, we have a Key-Value RDD with movie title and genre. We just need to filter only the movies with the interested genre which is "Action" in this example. Append a filter command to the code as shown below:
Step 7:
In this step, you can simply collect and print the result. However, I prefer to remove the genre from the record and sort the result based on movie names. The following code removes the genre using a
If you have any questions, feel free to comment below.
Find the project @ Git Hub
Step 4:
Let's remove the unnecessary
movieId
column from the record using the map
operation.package com.javahelps.spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
object ActionMovies {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "ActionMovies")
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/movies.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
val result = data.map(row => row.split(','))
.map(fields => (fields(1), fields(2)))
}
}
In the above code, the first map
operation splits each line read from the CSV file into separate fields split by the comma ( ,
). The second map
operation discards the first field which is movieId
in our scenario. After this operation, our sample dataset will look like the following:Key: title | Value: genres |
---|---|
Heat (1995) | Action|Crime|Thriller |
Sabrina (1995) | Comedy|Romance |
Tom and Huck (1995) | Adventure|Children |
Sudden Death (1995) | Action |
GoldenEye (1995) | Action|Adventure|Thriller |
It's time to use the
flatMapValues
operation. In the following code, I split all values (genres) by the pipe ( |
) character.package com.javahelps.spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
object ActionMovies {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "ActionMovies")
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/movies.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
val result = data.map(row => row.split(','))
.map(fields => (fields(1), fields(2)))
.flatMapValues(x => x.split('|'))
}
}
After the flatMapValues
operation, the sample dataset will look like the following table. Notice that now there are duplicate movie names but each (movieName, Genre) pair is unique.Key: title | Value: genre |
---|---|
Heat (1995) | Action |
Heat (1995) | Crime |
Heat (1995) | Thriller |
Sabrina (1995) | Comedy |
Sabrina (1995) | Romance |
Tom and Huck (1995) | Adventure |
Tom and Huck (1995) | Children |
Sudden Death (1995) | Action |
GoldenEye (1995) | Action |
GoldenEye (1995) | Adventure |
GoldenEye (1995) | Thriller |
Step 6:
This step is quite straight forward. After the last step, we have a Key-Value RDD with movie title and genre. We just need to filter only the movies with the interested genre which is "Action" in this example. Append a filter command to the code as shown below:
package com.javahelps.spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
object ActionMovies {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "ActionMovies")
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/movies.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
val result = data.map(row => row.split(','))
.map(fields => (fields(1), fields(2)))
.flatMapValues(x => x.split('|'))
.filter(x => x._2 == "Action")
}
}
After this operation, our sample dataset will have only the Key-Value pairs with a value "Action"
.Key: title | Value: genre |
---|---|
Heat (1995) | Action |
Sudden Death (1995) | Action |
GoldenEye (1995) | Action |
Step 7:
In this step, you can simply collect and print the result. However, I prefer to remove the genre from the record and sort the result based on movie names. The following code removes the genre using a
map
and prints the sorted list of action movies.package com.javahelps.spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
object ActionMovies {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "ActionMovies")
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/movies.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
val result = data.map(row => row.split(','))
.map(fields => (fields(1), fields(2)))
.flatMapValues(x => x.split('|'))
.filter(x => x._2 == "Action")
.map(x => x._1)
.collect()
result.sorted
.foreach(println)
}
}
Running this code prints the following output:'71 (2014)
'Hellboy': The Seeds of Creation (2004)
12 Rounds (2009)
13 Assassins (Jûsan-nin no shikaku) (2010)
...
...
Zulu (1964)
eXistenZ (1999)
xXx (2002)
xXx: State of the Union (2005)
As you can see all of them are action movies so our code works as expected. You can change the filter
function to list any type of movies.If you have any questions, feel free to comment below.
Find the project @ Git Hub
EmoticonEmoticon