Spark 04: Key Value RDD and Average Movie Ratings

In the first article of this series: Spark 01: Movie Rating Counter, we created three RDDs (data, filteredData and ratingData) each contains a singular datatype. For example, data and filteredData were String RDDs and the ratingRDD was a Float RDD. However, it is common to use an RDD which can store complex datatypes especially Key-Value pairs depending on the requirement. In this article, we will use a Key-Value RDD to calculate the average rating of each movie in our MoviLens dataset.  Those who don't have the MovieLens dataset, please visit the Spark 01: Movie Rating Counter article to setup your environment.

Spark 04: Key Value RDD and Average Movie Ratings

As you already know, the ratings.csv file has the fields movieId and rating. A given movie may get different ratings from different users. To get the average ratings of each movie, we need to add all ratings of each movie individually and divide the sum by the number of ratings.

Suppose, we have the following dataset, the movie with the 3809 has an average rating of 3.17 stars. The movie with the id 4006 has an average rating of 4.0 stars. Let's work out this example in the following steps.
userIdmovieIdratingtimestamp
138094.0964981220
140064.0964982903
2738091.0965149421
1938093.0965706415
438093.0986936207
6638094.01113188421
6838093.51158534962
6438094.01161566059
5138093.01230930133
7838093.01252573355

Step 1:
Create a new Scala → sbt project in IntelliJ IDEA with a name AverageMovieRatings and add the following dependencies to the build.sbt file.
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
Please check the first article on this series: Spark 01: Movie Rating Counter, for detailed instructions.

Step 2:
Create a new package com.javahelps.spark and create a new Scala object named AverageMovieRatings.

Step 03:
Create a main function and add the code to read the ratings.csv file as shown below:
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object AverageMovieRatings {

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)
    }
}
Please note that so far we haven't done anything new compared to the first program: Spark 01: Movie Rating Counter.

Step 04:
Create a new function mapToTuple and map String rows to tuples with selected attributes.
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
    }
}
In this code, the mapToTuple function converts a String row into a key-value pair movieId → (rating, count) where the initial value of count is 1. After the map operation, the final RDD will have a dataset similar to this:
Key: movieIdValue: (rating, count)
3809(4.0, 1)
4006(4.0, 1)
3809(1.0, 1)
3809(3.0, 1)
3809(3.0, 1)
3809(4.0, 1)
3809(3.5, 1)
3809(4.0, 1)
3809(3.0, 1)
3809(3.0, 1)


Step 05:
Perform reduceByKey operation on the final RDD to find the sum of ratings and number of ratings of each movie.
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
          .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
    }
}
In this step, we reduce the key-value pairs by key (in otherwords, combine all values having the same key using an arithmetic operation) and get a resultant RDD with tuples of (movieId, (totalRatings, count)). After the reduceByKey operation, our sample dataset will look like this:
Key: movieIdValue: (totalRating, count)
3809(28.5, 9)
4006(4.0, 1)

Step 6:
Given the totalRating and count (number of ratings), we can find the average rating by dividing totalRating by the count. Let's do this by another map operation as given below:
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
          .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
          .map(x => (x._1, x._2._1 / x._2._2))
    }
}
Above operation will convert the sample dataset into a tuple as shown below:
Key: movieIdValue: averageRating
38093.17
40064.0

Step 7:
At the end of Step 6, we will have movieIDs mapped to the average ratings. You can collect and print them in this step. However, I prefer to print a list of movieIDs sorted by their average ratings. Spark provides a convenient method sortBy to sort an RDD of tuples by any element in that tuple. The following code sorts the result by average ratings in descending order and prints the output to the console.
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
          .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
          .map(x => (x._1, x._2._1 / x._2._2))
          .sortBy(_._2, false)
          .collect()

        result.foreach(println)
    }
}
Running this code produces an output starting with:
(136850,5.0)
(1310,5.0)
(8238,5.0)
(5746,5.0)
(26928,5.0)
(134796,5.0)
(146684,5.0)
(88448,5.0)
(2824,5.0)
(2972,5.0)
(136556,5.0)
...
It means all these movies have an average rating 5.0. I was curious to know which movies received all 5.0 stars ratings. I manually searched the movie id 136556 in the movies.csv file. It is mapped to "Kung Fu Panda: Secrets of the Masters (2011)". No doubt! this movie deserves 5 stars ratings. However remember that these ratings may not match with IMDB or any other sites you follow.

If you have any questions, feel free to comment below.

Find the project @ Git Hub
Previous
Next Post »

Contact Form

Name

Email *

Message *