Spark 01: Movie Rating Counter

Apache Spark is a must to know framework for big data developers. This is an attempt to write a series of articles on Apache Spark to train you from zero to hero. In this series of articles, I will use the latest Apache Spark release which is 2.4.0 as of 2019 January. In the first few articles, we will code and test Apache Spark on IntelliJ Idea. As you may already know, Apache Spark is developed using Scala and of course, there are APIs available for other languages including Java and Python. Still, Scala has preferred over other languages for its performance and compact code. Therefore, you need to prepare the environment first.

Spark 01: Movie Rating Counter

My articles will be based on Frank Kane's course on Udemy: Apache Spark 2 with Scala - Hands On with Big Data! I highly recommend his tutorial if you prefer for a video tutorial.

Setup the Environment

Apache Spark 2.4.0 depends on Scala 2.12 which in turn depends on Java 1.8. Scala is unlike Java not very version compatible language. Therefore, please take special care on choosing versions.

Step 1:
Install Oracle Java Development Kit 1.8 on your system. Linux users can follow this article: Install Oracle JDK 8 on Linux

Step 2:
Install the latest IntelliJ IDEA. Again, Linux users can follow my article: Install IntelliJ IDEA on Linux

Step 3:
Install Scala plugin in IntelliJ IDEA. Regardless of your operating system, you can follow the article Setup Scala on IntelliJ IDEA.

Download the Dataset

Since this is a series of articles, we need a dataset to play with. We will use a new version of the Movie Database used in Frank Kane's tutorial. Please visit the grouplens website and download the small "MovieLens Latest Datasets".

Spark 01: Movie Rating Counter

Extract the ml-latest-small.zip file and play with the dataset. In this tutorial, we are going to count movies by rating and print them to the console. To do that, we need only the ml-latest-small/ratings.csv file which has rows similar to this:
userIdmovieIdratingtimestamp
138094.0964981220
140064.0964982903
150605.0964984002
23183.01445714835
23334.01445715029
217044.51445715228
235784.01445714885
268744.01445714952
As you can see, the user with the id 2 gave a 4-star rating to the movie with the id 3809. In this piece of data, we have one movie with 3 stars rating, five movies with 4 stars rating, one movie with 4.5 stars rating and one more movie with 5 stars ratings.

Get Your Hands Dirty

Step 1:
Create a new Scala → sbt project as shown in Setup Scala on IntelliJ IDEA Step #6.
Use the name Movie Ratings Counter and make sure that you have selected JDK 1.8 and Scala 2.12.x.

Step 2:
Open build.sbt file and append the following dependency definition to it.
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
After the modification, if IntelliJ asked you to Import Changes or Enable Auto-Import, click on Enable Auto-Import to import all the changes we made automatically.
After the modification, your build.sbt file should look like this:
name := "Movie Ratings Counter"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"

Step 3:
Expand the src → main → scala folder, right click on it, and create a new package named com.javahelps.spark.

Step 4:
Right click on the newly created package and select New → Scala Class. In the appeared dialog, enter the name: MovieRatingsCounter and select the kind Object. Finally, click on OK to create the Scala object file.

Spark 01: Movie Rating Counter

Step 5:
In the MovieRatingsCounter object create a new main function as given below:
package com.javahelps.spark

object MovieRatingsCounter {

    def main(args: Array[String]): Unit = {

    }
}


Step 6: Pay more attention from this step.
Every Spark application needs a SparkContext to access Spark APIs. Create a new SparkContext object as shown below:
package com.javahelps.spark

import org.apache.spark.SparkContext

object MovieRatingsCounter {

    def main(args: Array[String]): Unit = {

        val sc = new SparkContext("local[*]", "MovieRatingsCounter")
    }
}
The first argument of SparkContext informs Spark to use all CPU cores available in your(local) computer for the upcoming operations. If you want to allocate only 2 cores, you must modify this as local[2]. The second argument is your Spark application name which is MovieRatingsCounter in this example.

Step 7:
Read the ratings.csv file as a text file.
package com.javahelps.spark

import org.apache.spark.SparkContext

object MovieRatingsCounter {

    def main(args: Array[String]): Unit = {

        val sc = new SparkContext("local[*]", "MovieRatingsCounter")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")
    }
}
In this code, I read the file from /tmp/ml-latest-small/ratings.csv path. Depending on where you extracted the ml-latest-small.zip file, your path may differ.

Step 8:
The ratings.csv has a string header which is not necessary for our computation. Remove the header by extracting the first row from the data and by filtering the rows.
package com.javahelps.spark

import org.apache.spark.SparkContext

object MovieRatingsCounter {

    def main(args: Array[String]): Unit = {

        val sc = new SparkContext("local[*]", "MovieRatingsCounter")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)
    }
}

Step 9:
Now map lines of strings to rows of floats and count the number of occurrences of each unique values.
package com.javahelps.spark

import org.apache.spark.SparkContext

object MovieRatingsCounter {

    def main(args: Array[String]): Unit = {

        val sc = new SparkContext("local[*]", "MovieRatingsCounter")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(line => line.split(',')(2).toFloat) // Extract rating from line as float
          .countByValue() // Count number of occurrences of each number

        println(result)
    }
}
The countByValue function returns a Scala Map[Float, Int]. You can run this code and check the ouput.

Step 10:
If you ran the code in the previous step, you might notice a lot of INFO logs printed on the console and made the output clumsy. You can fix it by setting the level to the logger. In the following code, I have inserted a Logger.getLogger("org").setLevel(Level.ERROR) command to inform the logger to log only error messages. At the same time, I also want the output to be sorted by rating. Therefore, I have replaced the println(result) command by a Scala code to sort the map and print it line by line.
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object MovieRatingsCounter {

    def main(args: Array[String]): Unit = {

        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "MovieRatingsCounter")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(line => line.split(',')(2).toFloat) // Extract rating from line as float
          .countByValue() // Count number of occurrences of each number

        // Sort and print the result
        result.toSeq
          .sorted
          .foreach(println)
    }
}
Running this code produce the following output:
(0.5,1370)
(1.0,2811)
(1.5,1791)
(2.0,7551)
(2.5,5550)
(3.0,20047)
(3.5,13136)
(4.0,26818)
(4.5,8551)
(5.0,13211)
You can interpret this output as there are 1370 movies rated 0.5 stars and so on.

Good job! You have successfully created your first Spark application which reads a CSV file and counts elements and prints it in a decent format.


Find the project @ Git Hub
Previous
Next Post »

Contact Form

Name

Email *

Message *