Spark 05: List Action Movies with Spark flatMap

Welcome to the fifth article in the series of Apache Spark tutorials. In this article, you will learn the application of flatMap transform operation. After the introduction to flatMap operation, a sample Spark application is developed to list all action movies from the MovieLens dataset.

Spark 05: List Action Movies with Spark flatMap

In the previous articles, we have used the map transform operation which transforms an entity into another entity where the transformation is one-to-one. For example, suppose you have a String RDD named lines, applying lines.map(x => x.toUpperCase) operation creates a new String RDD with the same number of records but with uppercase string literals as shown below:
Apache Spark map

On the other hand, flatMap transform operation is a one-to-many. For example, if we call lines.flatMap(x => x.split(' '), flatMap will create a new RDD with 6 records as shown below because it splits records with space into separate words.

Apache Spark flatMap

flatMap can be used for one-to-one as well as one-to-zero mapping as well. For example, lines.flatMap(x => None) will return an empty RDD because flatMap does not create a record in the resulting RDD for None values.

Apache Spark flatMap to None

Sometimes we may need to split a Key-Value RDD. There is a convenient method flatMapValues which can be used to split a Key-Value pair into multiple Key-Value pairs. For example, calling flatMapValues(x => x.split(' ')) in the diagram shown below creates 6 records in the resulting RDD. Notice that split values in the resultant RDD have the same key if they were from the same Key-Value pair.

Apache Spark flatMapValues

With this knowledge, we can jump into the example which uses the flatMapValues transform operation to list action movies from the MovieLens dataset. As we did in the previous articles, we are going to use the same MovieLens dataset we used in the first article: Spark 01: Movie Rating Counter. If you don't have the dataset, please follow the first article and download the dataset.

The MovieLens dataset has a movies.csv file which contains genres of each movie. The following table shows a sample dataset with rows extracted from the movies.csv file.
movieIdtitlegenres
6Heat (1995)Action|Crime|Thriller
7Sabrina (1995)Comedy|Romance
8Tom and Huck (1995)Adventure|Children
9Sudden Death (1995)Action
10GoldenEye (1995)Action|Adventure|Thriller
As you can see, the genres column contains all genres of a movie separated by a pipe ( | ) character. In the following example, we are going to list all action movies on the console.

Step 1:
Create a new Scala → sbt project in IntelliJ IDEA with a name ActionMovies and add the following dependencies to the build.sbt file.
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
Please check the first article on this series: Spark 01: Movie Rating Counter, for detailed instructions.

Step 2:
Create a new package com.javahelps.spark and create a new Scala object named ActionMovies.

Step 3:
Create a main function and add the code to read the movies.csv file as shown below:
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object ActionMovies {

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "ActionMovies")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/movies.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)
    }
}
Please note that so far we haven't done anything new compared to the first program: Spark 01: Movie Rating Counter.

Step 4:
Let's remove the unnecessary movieId column from the record using the map operation.
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object ActionMovies {

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "ActionMovies")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/movies.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(row => row.split(','))
          .map(fields => (fields(1), fields(2)))
    }
}
In the above code, the first map operation splits each line read from the CSV file into separate fields split by the comma ( , ). The second map operation discards the first field which is movieId in our scenario. After this operation, our sample dataset will look like the following:
Key: titleValue: genres
Heat (1995)Action|Crime|Thriller
Sabrina (1995)Comedy|Romance
Tom and Huck (1995)Adventure|Children
Sudden Death (1995)Action
GoldenEye (1995)Action|Adventure|Thriller


Step 5:
It's time to use the flatMapValues operation. In the following code, I split all values (genres) by the pipe ( | ) character.
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object ActionMovies {

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "ActionMovies")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/movies.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(row => row.split(','))
          .map(fields => (fields(1), fields(2)))
          .flatMapValues(x => x.split('|'))
    }
}
After the flatMapValues operation, the sample dataset will look like the following table. Notice that now there are duplicate movie names but each (movieName, Genre) pair is unique.
Key: titleValue: genre
Heat (1995)Action
Heat (1995)Crime
Heat (1995)Thriller
Sabrina (1995)Comedy
Sabrina (1995)Romance
Tom and Huck (1995)Adventure
Tom and Huck (1995)Children
Sudden Death (1995)Action
GoldenEye (1995)Action
GoldenEye (1995)Adventure
GoldenEye (1995)Thriller

Step 6:
This step is quite straight forward. After the last step, we have a Key-Value RDD with movie title and genre. We just need to filter only the movies with the interested genre which is "Action" in this example. Append a filter command to the code as shown below:
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object ActionMovies {

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "ActionMovies")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/movies.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(row => row.split(','))
          .map(fields => (fields(1), fields(2)))
          .flatMapValues(x => x.split('|'))
          .filter(x => x._2 == "Action")
    }
}
After this operation, our sample dataset will have only the Key-Value pairs with a value "Action".
Key: titleValue: genre
Heat (1995)Action
Sudden Death (1995)Action
GoldenEye (1995)Action

Step 7:
In this step, you can simply collect and print the result. However, I prefer to remove the genre from the record and sort the result based on movie names. The following code removes the genre using a map and prints the sorted list of action movies.
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object ActionMovies {

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "ActionMovies")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/movies.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(row => row.split(','))
          .map(fields => (fields(1), fields(2)))
          .flatMapValues(x => x.split('|'))
          .filter(x => x._2 == "Action")
          .map(x => x._1)
          .collect()

        result.sorted
          .foreach(println)
    }
}
Running this code prints the following output:
'71 (2014)
'Hellboy': The Seeds of Creation (2004)
12 Rounds (2009)
13 Assassins (Jûsan-nin no shikaku) (2010)
...
...
Zulu (1964)
eXistenZ (1999)
xXx (2002)
xXx: State of the Union (2005)
As you can see all of them are action movies so our code works as expected. You can change the filter function to list any type of movies.

If you have any questions, feel free to comment below.

Find the project @ Git Hub
Previous
Next Post »

Contact Form

Name

Email *

Message *