Spark has a handful of gotchas that exist while running in any clustered environment (standalone, K8s, Mesos, Yarn) which can cause runtime issues late in the execution of a job depending how it is coded. It is better to know of these pitfalls ahead of time as the scala compiler will not alert you of them as syntactically your code may be perfect.
Member Serialization
The following is a remarkably unrealistic example as why would you not just create the variable in the main?.
However, there are instances when sub-classing that you may want to override values that are a member
of the base. In that, it should be known serialization issues can arise. Spark serializes entire objects with their
initial state to workers but deal with implicits differently -- it's always better to declare and initialize variables that will not
change locally in the call stack from main().
Note: Spark uses closures to defined execution strategies but these closures do not operate in the same fashion as in Scala itself.
case class Test(x: String)
object ObjectMemberIssue {
// When foo is accessed via a map operation (transformation) on a worker
// it will be null
var foo: String = null
def main(args: Array[String]) : Unit = {
val session = SparkSession.builder().appName("foo").getOrCreate()
foo = "bar"
import session.implicits._
val dsOfStrings = (1 to 100).map(new Test(_.toString)).toDS()
// This will produce a NullPointerException on the worker which will
// cause it to die if the job is not running in local mode in the same JVM
// as the driver. In cluster mode (standalone or otherwise) MemberIssue
// will be serialized in its default state to the worker causing
// the access of foo and the append to x to produce the NPE
dsOfStrings.map(i => i.copy(i.x ++ foo)).collect().foreach(println)
}
}
An alternative solution is to use an implicit val:
object ObjectMemberIssueSolution {
var foo: String = null
// implicits are searched for and are contexually bound for serialization
// at runtime time but are intrinsically value structures and will not change
def append(base: String)(implicit val s: String) = base ++ s
def main(args: Array[String]) : Unit = {
val session = SparkSession.builder().appName("foo").getOrCreate()
foo = "bar"
// implicits are contexual serialized
// especially when passed through another method which forces it to be bound
// in the Spark serializer as the scala compiler guarantees implicits are bound
// at compile time based upon references
implicit val _foo = foo
import session.implicits._
val dsOfStrings = (1 to 100).map(new Test(_.toString)).toDS()
// This will produce a NullPointerException on the worker which will
// cause it to die if the job is not running in local mode in the same JVM
// as the driver. In cluster mode (standalone or otherwise) MemberIssue
// will be serialized in its default state to the worker but b/c the
// _foo is an implicit val prior to access subsequent invocations of append will not
// cause issue.
//
// The implicit does not need to be passed as it is automagically wired in via
//
dsOfStrings.map(i => i.copy(append(i.x)).collect().foreach(println)
}
}
Accumulators
Broadcast variables