Spark 06: Broadcast Variables

If you read the Spark 04: Key-Value RDD and Average Movie Ratings article, you might wonder what to do with popular movie IDs printed at the end. A data analyst cannot ask his/her users to manually check those IDs in a CSV file to find the movie name. In this article, you will learn how to map those movie IDs to movie names using Apache Spark's variable broadcasting.

Spark 06: Broadcast Variables

Suppose you want to share a read-only data that can fit into memory with every worker in your Spark cluster, broadcast that data. The broadcasted variable will be distributed only once and cached in every worker node so that it can be reused any number of times. More about broadcasting will be covered later in this article after the code example.
In this article, I use the same project from Spark 04: Key-Value RDD and Average Movie Ratings article: AverageMovieRatings. You can either modify the existing code or create a new project with the code from the Spark 04 article.

For your reference, the final code from Spark 04 article is provided below:
package com.javahelps.spark

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
          .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
          .map(x => (x._1, x._2._1 / x._2._2))
          .sortBy(_._2, false)
          .collect()

        result.foreach(println)
    }
}
Running this code produces an output starting with:
(136850,5.0)
(1310,5.0)
(8238,5.0)
(5746,5.0)
(26928,5.0)
(134796,5.0)
(146684,5.0)
(88448,5.0)
(2824,5.0)
(2972,5.0)
(136556,5.0)
...

Step 1:
The movies.csv file has movieId and the title columns as shown below:

Create a function loadMovieNames to read the movies.csv file and to create Key-Value pairs of movieIds and titles.
package com.javahelps.spark

import java.nio.charset.CodingErrorAction

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

import scala.io.{Codec, Source}

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def loadMovieNames(): Map[Int, String] = {

        // Handle character encoding issues
        implicit val codec = Codec("UTF-8")
        codec.onMalformedInput(CodingErrorAction.REPLACE)
        codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

        var movieNames: Map[Int, String] = Map()

        // Read lines from movies.csv into Iterator. Drop the first (header) row.
        val lines = Source.fromFile("/tmp/ml-latest-small/movies.csv").getLines().drop(1)
        for (line <- lines) {
            val fields = line.split(',')
            movieNames += (fields(0).toInt -> fields(1))
        }
        return movieNames
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
          .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
          .map(x => (x._1, x._2._1 / x._2._2))
          .sortBy(_._2, false)
          .collect()

        result.foreach(println)
    }
}


Step 2:
After creating the SparkContext, broadcast the movie names and assign the returned Broadcast reference to a variable.
package com.javahelps.spark

import java.nio.charset.CodingErrorAction

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

import scala.io.{Codec, Source}

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def loadMovieNames(): Map[Int, String] = {

        // Handle character encoding issues
        implicit val codec = Codec("UTF-8")
        codec.onMalformedInput(CodingErrorAction.REPLACE)
        codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

        var movieNames: Map[Int, String] = Map()

        // Read lines from movies.csv into Iterator. Drop the first (header) row.
        val lines = Source.fromFile("/tmp/ml-latest-small/movies.csv").getLines().drop(1)
        for (line <- lines) {
            val fields = line.split(',')
            movieNames += (fields(0).toInt -> fields(1))
        }
        return movieNames
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Broadcast the movie names
        val names = sc.broadcast(loadMovieNames())

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
          .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
          .map(x => (x._1, x._2._1 / x._2._2))
          .sortBy(_._2, false)
          .collect()

        result.foreach(println)
    }
}


Step 3:
Now you can use the broadcasted variable anywhere in your Spark code. After sorting the records by key, map the key to movie name as shown below:
package com.javahelps.spark

import java.nio.charset.CodingErrorAction

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

import scala.io.{Codec, Source}

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def loadMovieNames(): Map[Int, String] = {

        // Handle character encoding issues
        implicit val codec = Codec("UTF-8")
        codec.onMalformedInput(CodingErrorAction.REPLACE)
        codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

        var movieNames: Map[Int, String] = Map()

        // Read lines from movies.csv into Iterator. Drop the first (header) row.
        val lines = Source.fromFile("/tmp/ml-latest-small/movies.csv").getLines().drop(1)
        for (line <- lines) {
            val fields = line.split(',')
            movieNames += (fields(0).toInt -> fields(1))
        }
        return movieNames
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Broadcast the movie names
        val names = sc.broadcast(loadMovieNames())

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
          .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
          .map(x => (x._1, x._2._1 / x._2._2))
          .sortBy(_._2, false)
          .map(x => (names.value(x._1), x._2))
          .collect()

        result.foreach(println)
    }
}

Running this code will produce the following code:
(Villain (1971),5.0)
(Hype! (1996),5.0)
(Little Murders (1971),5.0)
(Galaxy of Terror (Quest) (1981),5.0)
("Summer's Tale,5.0)
(Bitter Lake (2015),5.0)
(Cosmic Scrat-tastrophe (2015),5.0)
(Paper Birds (Pájaros de papel) (2010),5.0)
(On the Ropes (1999),5.0)
(Red Sorghum (Hong gao liang) (1987),5.0)
...

As you can see, movie ids are mapped to movie names. Broadcasting variable distributes the given variable across the cluster in an m-to-m distribution. This makes data sharing efficient in a large network. Worker nodes cache the broadcasted variable and make it locally available to the operations executed in those nodes.

Apache Spark Broadcasting Variable
A broadcasted variable should not be modified later in the program because it will make the variable inconsistent across the cluster. For example, suppose you modify the variable after broadcasting it. If there is a failure and a worker node is restarted again, it may receive the modified variable from the driver while other worker nodes have the previously broadcasted unmodified variable. It may result in unexpected output.


Of course, you can directly access the movie names variable without broadcasting it as shown below and get the same output:
package com.javahelps.spark

import java.nio.charset.CodingErrorAction

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

import scala.io.{Codec, Source}

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def loadMovieNames(): Map[Int, String] = {

        // Handle character encoding issues
        implicit val codec = Codec("UTF-8")
        codec.onMalformedInput(CodingErrorAction.REPLACE)
        codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

        var movieNames: Map[Int, String] = Map()

        // Read lines from movies.csv into Iterator. Drop the first (header) row.
        val lines = Source.fromFile("/tmp/ml-latest-small/movies.csv").getLines().drop(1)
        for (line <- lines) {
            val fields = line.split(',')
            movieNames += (fields(0).toInt -> fields(1))
        }
        return movieNames
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Broadcast the movie names
        val names = loadMovieNames()

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
          .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
          .map(x => (x._1, x._2._1 / x._2._2))
          .sortBy(_._2, false)
          .map(x => (names(x._1), x._2))
          .collect()

        result.foreach(println)
    }
}
However, this code will share the variable with worker nodes along with the operation to be executed in a 1-to-m distribution. In other words, the driver node sends the variable to each and every executor in the cluster.

Apache Spark Sharing Variable

Running both codes (broadcasting variable and directly sharing variable) in your local machine will not show a huge difference in the performance because the data we share is relatively small and the program is running in the same machine using multiple threads. To see the difference, you need to run a similar Spark application in an actual cluster with a large shared variable. However, never share a variable directly in a production environment. In conclusion, you need to remember these three rules:
  • To share data across the cluster, broadcast the variable instead of directly accessing it in the code
  • Never modify a variable after broadcasting it
  • Ensure that the data you broadcast can fit into the memory of each node
If you have any questions, feel free to comment below.

Find the project @ Git Hub
Previous
Next Post »

Contact Form

Name

Email *

Message *