If you read the Spark 04: Key-Value RDD and Average Movie Ratings article, you might wonder what to do with popular movie IDs printed at the end. A data analyst cannot ask his/her users to manually check those IDs in a CSV file to find the movie name. In this article, you will learn how to map those movie IDs to movie names using Apache Spark's variable broadcasting.
Suppose you want to share a read-only data that can fit into memory with every worker in your Spark cluster, broadcast that data. The broadcasted variable will be distributed only once and cached in every worker node so that it can be reused any number of times. More about broadcasting will be covered later in this article after the code example.
In this article, I use the same project from Spark 04: Key-Value RDD and Average Movie Ratings article: AverageMovieRatings. You can either modify the existing code or create a new project with the code from the Spark 04 article.
For your reference, the final code from Spark 04 article is provided below:
package com.javahelps.spark
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
object AverageMovieRatings {
def mapToTuple(line: String): (Int, (Float, Int)) = {
val fields = line.split(',')
return (fields(1).toInt, (fields(2).toFloat, 1))
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "AverageMovieRatings")
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
val result = data.map(mapToTuple)
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
.map(x => (x._1, x._2._1 / x._2._2))
.sortBy(_._2, false)
.collect()
result.foreach(println)
}
}
Running this code produces an output starting with:(136850,5.0)
(1310,5.0)
(8238,5.0)
(5746,5.0)
(26928,5.0)
(134796,5.0)
(146684,5.0)
(88448,5.0)
(2824,5.0)
(2972,5.0)
(136556,5.0)
...
Step 1:
The movies.csv file has movieId and the title columns as shown below:
Create a function loadMovieNames to read the movies.csv file and to create Key-Value pairs of movieIds and titles.
Step 2:
After creating the SparkContext, broadcast the movie names and assign the returned Broadcast reference to a variable.
Step 3:
Now you can use the broadcasted variable anywhere in your Spark code. After sorting the records by key, map the key to movie name as shown below:
Running this code will produce the following code:
As you can see, movie ids are mapped to movie names. Broadcasting variable distributes the given variable across the cluster in an m-to-m distribution. This makes data sharing efficient in a large network. Worker nodes cache the broadcasted variable and make it locally available to the operations executed in those nodes.
A broadcasted variable should not be modified later in the program because it will make the variable inconsistent across the cluster. For example, suppose you modify the variable after broadcasting it. If there is a failure and a worker node is restarted again, it may receive the modified variable from the driver while other worker nodes have the previously broadcasted unmodified variable. It may result in unexpected output.
Of course, you can directly access the movie names variable without broadcasting it as shown below and get the same output:
Running both codes (broadcasting variable and directly sharing variable) in your local machine will not show a huge difference in the performance because the data we share is relatively small and the program is running in the same machine using multiple threads. To see the difference, you need to run a similar Spark application in an actual cluster with a large shared variable. However, never share a variable directly in a production environment. In conclusion, you need to remember these three rules:
Find the project @ Git Hub
Create a function loadMovieNames to read the movies.csv file and to create Key-Value pairs of movieIds and titles.
package com.javahelps.spark
import java.nio.charset.CodingErrorAction
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import scala.io.{Codec, Source}
object AverageMovieRatings {
def mapToTuple(line: String): (Int, (Float, Int)) = {
val fields = line.split(',')
return (fields(1).toInt, (fields(2).toFloat, 1))
}
def loadMovieNames(): Map[Int, String] = {
// Handle character encoding issues
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
var movieNames: Map[Int, String] = Map()
// Read lines from movies.csv into Iterator. Drop the first (header) row.
val lines = Source.fromFile("/tmp/ml-latest-small/movies.csv").getLines().drop(1)
for (line <- lines) {
val fields = line.split(',')
movieNames += (fields(0).toInt -> fields(1))
}
return movieNames
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "AverageMovieRatings")
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
val result = data.map(mapToTuple)
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
.map(x => (x._1, x._2._1 / x._2._2))
.sortBy(_._2, false)
.collect()
result.foreach(println)
}
}
Step 2:
After creating the SparkContext, broadcast the movie names and assign the returned Broadcast reference to a variable.
package com.javahelps.spark
import java.nio.charset.CodingErrorAction
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import scala.io.{Codec, Source}
object AverageMovieRatings {
def mapToTuple(line: String): (Int, (Float, Int)) = {
val fields = line.split(',')
return (fields(1).toInt, (fields(2).toFloat, 1))
}
def loadMovieNames(): Map[Int, String] = {
// Handle character encoding issues
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
var movieNames: Map[Int, String] = Map()
// Read lines from movies.csv into Iterator. Drop the first (header) row.
val lines = Source.fromFile("/tmp/ml-latest-small/movies.csv").getLines().drop(1)
for (line <- lines) {
val fields = line.split(',')
movieNames += (fields(0).toInt -> fields(1))
}
return movieNames
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "AverageMovieRatings")
// Broadcast the movie names
val names = sc.broadcast(loadMovieNames())
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
val result = data.map(mapToTuple)
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
.map(x => (x._1, x._2._1 / x._2._2))
.sortBy(_._2, false)
.collect()
result.foreach(println)
}
}
Now you can use the broadcasted variable anywhere in your Spark code. After sorting the records by key, map the key to movie name as shown below:
package com.javahelps.spark
import java.nio.charset.CodingErrorAction
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import scala.io.{Codec, Source}
object AverageMovieRatings {
def mapToTuple(line: String): (Int, (Float, Int)) = {
val fields = line.split(',')
return (fields(1).toInt, (fields(2).toFloat, 1))
}
def loadMovieNames(): Map[Int, String] = {
// Handle character encoding issues
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
var movieNames: Map[Int, String] = Map()
// Read lines from movies.csv into Iterator. Drop the first (header) row.
val lines = Source.fromFile("/tmp/ml-latest-small/movies.csv").getLines().drop(1)
for (line <- lines) {
val fields = line.split(',')
movieNames += (fields(0).toInt -> fields(1))
}
return movieNames
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "AverageMovieRatings")
// Broadcast the movie names
val names = sc.broadcast(loadMovieNames())
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
val result = data.map(mapToTuple)
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
.map(x => (x._1, x._2._1 / x._2._2))
.sortBy(_._2, false)
.map(x => (names.value(x._1), x._2))
.collect()
result.foreach(println)
}
}
Running this code will produce the following code:
(Villain (1971),5.0)
(Hype! (1996),5.0)
(Little Murders (1971),5.0)
(Galaxy of Terror (Quest) (1981),5.0)
("Summer's Tale,5.0)
(Bitter Lake (2015),5.0)
(Cosmic Scrat-tastrophe (2015),5.0)
(Paper Birds (Pájaros de papel) (2010),5.0)
(On the Ropes (1999),5.0)
(Red Sorghum (Hong gao liang) (1987),5.0)
...
As you can see, movie ids are mapped to movie names. Broadcasting variable distributes the given variable across the cluster in an m-to-m distribution. This makes data sharing efficient in a large network. Worker nodes cache the broadcasted variable and make it locally available to the operations executed in those nodes.
A broadcasted variable should not be modified later in the program because it will make the variable inconsistent across the cluster. For example, suppose you modify the variable after broadcasting it. If there is a failure and a worker node is restarted again, it may receive the modified variable from the driver while other worker nodes have the previously broadcasted unmodified variable. It may result in unexpected output.
Of course, you can directly access the movie names variable without broadcasting it as shown below and get the same output:
package com.javahelps.spark
import java.nio.charset.CodingErrorAction
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import scala.io.{Codec, Source}
object AverageMovieRatings {
def mapToTuple(line: String): (Int, (Float, Int)) = {
val fields = line.split(',')
return (fields(1).toInt, (fields(2).toFloat, 1))
}
def loadMovieNames(): Map[Int, String] = {
// Handle character encoding issues
implicit val codec = Codec("UTF-8")
codec.onMalformedInput(CodingErrorAction.REPLACE)
codec.onUnmappableCharacter(CodingErrorAction.REPLACE)
var movieNames: Map[Int, String] = Map()
// Read lines from movies.csv into Iterator. Drop the first (header) row.
val lines = Source.fromFile("/tmp/ml-latest-small/movies.csv").getLines().drop(1)
for (line <- lines) {
val fields = line.split(',')
movieNames += (fields(0).toInt -> fields(1))
}
return movieNames
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "AverageMovieRatings")
// Broadcast the movie names
val names = loadMovieNames()
// Read a text file
var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")
// Extract the first row which is the header
val header = data.first();
// Filter out the header from the dataset
data = data.filter(row => row != header)
val result = data.map(mapToTuple)
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
.map(x => (x._1, x._2._1 / x._2._2))
.sortBy(_._2, false)
.map(x => (names(x._1), x._2))
.collect()
result.foreach(println)
}
}
However, this code will share the variable with worker nodes along with the operation to be executed in a 1-to-m distribution. In other words, the driver node sends the variable to each and every executor in the cluster.Running both codes (broadcasting variable and directly sharing variable) in your local machine will not show a huge difference in the performance because the data we share is relatively small and the program is running in the same machine using multiple threads. To see the difference, you need to run a similar Spark application in an actual cluster with a large shared variable. However, never share a variable directly in a production environment. In conclusion, you need to remember these three rules:
- To share data across the cluster, broadcast the variable instead of directly accessing it in the code
- Never modify a variable after broadcasting it
- Ensure that the data you broadcast can fit into the memory of each node
Find the project @ Git Hub
EmoticonEmoticon