Unit 5 — Scala & Spark Complete Notes

⚡ Exam today — read this during Unit 4

Scala Basics

val · var · lazy val · def · if/else · for loops · functions · exceptions

🔥val vs lazy val vs def — asked in Dec 2025 and Aug 2025 papers. Know the exact difference.
IMPval vs var vs lazy val vs def
val x = expensive()       // evaluated IMMEDIATELY, stored, IMMUTABLE
var x = expensive()       // evaluated IMMEDIATELY, stored, MUTABLE
lazy val x = expensive()  // evaluated FIRST TIME accessed, then stored
def x = expensive()       // evaluated EVERY TIME called, never stored
KeywordWhen evaluatedStored?Mutable?
valAt definition timeYesNo (immutable)
varAt definition timeYesYes
lazy valFirst access onlyYes (after 1st use)No
defEvery single callNeverN/A

lazy val = halfway between val (evaluates once) and def (evaluates every time)

Declaring Values
val answer = 8 * 5 + 2       // Int, inferred
val s: String = "Hello"        // explicit type
var counter = 0
counter = 1                    // OK for var
// answer = 0  // ERROR: val is immutable
val xmax, ymax = 100         // both = 100
if / else (returns a value!)
// In Scala, if/else HAS a value
val s = if (x > 0) 1 else -1
// same as Java's: x > 0 ? 1 : -1

// Missing else = Unit (like void)
if (x > 0) 1
// equivalent to: if(x>0) 1 else ()
for Loops
IMPAdvanced for loops — generators, guards, yield
// Basic range
for (i <- 1 to 5) println(i)      // 1 2 3 4 5 (to = inclusive)
for (i <- 1 until 5) println(i)   // 1 2 3 4 (until = excludes last)
for (i <- 10 to 0 by -1) println(i) // countdown 10..0

// Multiple generators (nested loop effect)
for (i <- 1 to 3; j <- 1 to 3) print(f"${10*i+j}%3d")
// Output: 11 12 13 21 22 23 31 32 33

// Guard — filter condition (NO semicolon before if!)
for (i <- 1 to 3; j <- 1 to 3 if i != j) print(f"${10*i+j}%3d")
// Output: 12 13 21 23 31 32

// With definition inside loop
for (i <- 1 to 3; from = 4 - i; j <- from to 3) print(f"${10*i+j}%3d")
// Output: 13 22 23 31 32 33

// yield — builds a collection (for comprehension)
val result = for (i <- 1 to 10) yield i % 3
// Vector(1, 2, 0, 1, 2, 0, 1, 2, 0, 1)
Functions
Defining Functions
// Basic — return type inferred (not needed unless recursive)
def abs(x: Double) = if (x >= 0) x else -x

// Multi-line — last expression is return value
def fac(n: Int) = {
  var r = 1
  for (i <- 1 to n) r = r * i
  r   // returned — no return keyword needed
}

// Recursive — MUST specify return type
def fac(n: Int): Int = if (n <= 0) 1 else n * fac(n - 1)

// Default arguments
def decorate(str: String, left: String = "[", right: String = "]") = left + str + right
decorate("Hello")           // "[Hello]"
decorate("Hello", "<<<", ">>>") // "<<<Hello>>>"

// Variable args
def sum(args: Int*) = { var r = 0; for (a <- args) r += a; r }
sum(1, 4, 9)         // 14
sum(1 to 5: _*)      // pass range as varargs

// Procedure (returns Unit, no = sign)
def printBox(s: String) {
  val border = "-" * (s.length + 2)
  println(s"$border\n|$s|\n$border")
}
IMPException Handling
// throw — type is Nothing
throw new IllegalArgumentException("x should not be negative")

// try / catch / finally
try {
  process(input)
} catch {
  case ex: IllegalArgumentException => println("Bad arg: " + ex.getMessage)
  case ex: Exception                 => println("Other error")
} finally {
  // ALWAYS runs — use for cleanup (close files etc)
  in.close()
}

// if/else with throw — type of expression is Double (other branch)
if (x >= 0) { Math.sqrt(x) }
else throw new IllegalArgumentException("x negative")

Arrays, Maps & Tuples

Fixed arrays · ArrayBuffer · mutable/immutable Maps · Tuples · zip

🔥ArrayBuffer fruits program appeared VERBATIM in June 2025, Dec 2025, Aug 2025. Memorize it cold.
Fixed Array
val nums = new Array[Int](10)   // all zeros
val a = new Array[String](10)  // all null
val s = Array("Hi", "Bye")     // no 'new' with values
s(0)  // "Hi" — use () not []!

// 2D
val m = Array.ofDim[Double](3,4)
m(0)(1) = 42.0

// Useful methods
Array(1,7,2,9).sum     // 19
Array(1,7,2,9).max     // 9
Array(1,7,2,9).sorted  // Array(1,2,7,9)
a.mkString(", ")          // "1, 7, 2, 9"
ArrayBuffer operations
import scala.collection.mutable.ArrayBuffer
val b = ArrayBuffer[Int]()

b += 1                  // add one
b += (1, 2, 3)          // add multiple
b ++= Array(8, 13)      // append collection
b.trimEnd(2)            // remove last 2
b.insert(2, 6)         // insert at index 2
b.remove(2)            // remove at index 2
b.remove(2, 3)         // remove 3 from index 2
b.sortWith(_ > _)        // sort descending
b.sorted                 // sort ascending
b.toArray                // convert to Array
PYQ x3THE Fruits Program — write this from memory
import scala.collection.mutable.ArrayBuffer

// 1. Create with initial elements
val fruits = ArrayBuffer("Apple", "Banana", "Mango", "Orange")

// 2. Append Strawberry and Pineapple
fruits ++= Array("Strawberry", "Pineapple")
// ArrayBuffer(Apple, Banana, Mango, Orange, Strawberry, Pineapple)

// 3. Sort descending
val sorted = fruits.sortWith(_ > _)
// ArrayBuffer(Strawberry, Pineapple, Orange, Mango, Banana, Apple)

// 4. Remove last 2
sorted.trimEnd(2)
// ArrayBuffer(Strawberry, Pineapple, Orange, Mango)

// 5. Convert to Array
val arr = sorted.toArray
println(arr.mkString(", "))
// Output: Strawberry, Pineapple, Orange, Mango
Maps
IMPMutable vs Immutable Maps
// IMMUTABLE (default) — cannot change
val scores = Map("Alice" -> 10, "Bob" -> 3, "Cindy" -> 8)

// MUTABLE — add/update/remove keys
val sc = scala.collection.mutable.Map("Alice" -> 10, "Bob" -> 3)
sc("Bob") = 20                       // update existing
sc("Fred") = 7                       // add new
sc += ("Alice" -> 15, "Zara" -> 5) // add multiple
sc -= "Alice"                        // remove key

// Accessing
val b = scores("Bob")                 // 3 (throws if not found)
val b = scores.getOrElse("Bob", 0)   // safe — returns 0 if missing
scores.contains("Bob")               // true/false

// Iterating
for ((k, v) <- scores) println(s"$k -> $v")
scores.keySet                           // all keys
for (v <- scores.values) println(v)   // all values
for ((k, v) <- scores) yield (v, k)  // reverse map

// Sorted map
val sorted = scala.collection.mutable.SortedMap("Bob"->3, "Alice"->10)
Tuples
Tuples — group mixed types
// Create
val t = (1, 3.14, "Fred")  // type: (Int, Double, String)

// Access — starts at _1 (NOT 0!)
t._1  // 1
t._2  // 3.14
t._3  // "Fred"

// partition() returns pair of strings
"New York".partition(_.isUpper)  // ("NY", "ew ork")

// zip — bundle two arrays into pairs
val symbols = Array("<", "-", ">")
val counts  = Array(2, 10, 2)
val pairs = symbols.zip(counts)    // Array(("<",2), ("-",10), (">",2))
for ((s, n) <- pairs) print(s * n) // <<---------->>

Apache Spark — Intro & Architecture

What is Spark · unified stack · architecture diagram · Spark vs Hadoop

What is Apache Spark?
  • Cluster computing platform — fast and general-purpose
  • Extends MapReduce to support interactive queries and stream processing
  • Key feature: runs computations in-memory — 100× faster than Hadoop
  • Core abstraction: RDD (Resilient Distributed Dataset)
  • Two user groups: Data Scientists (ad-hoc analysis via shells) and Engineers (production pipelines)
Spark Unified Stack — Components
ComponentPurpose
Spark CoreTask scheduling, memory management, fault recovery, RDD API. Foundation for everything.
Spark SQLStructured data via SQL and HiveQL. JSON, Parquet, Hive sources.
Spark StreamingReal-time stream processing (log files, message queues). Micro-batching.
MLlibML algorithms — classification, regression, clustering, collaborative filtering.
GraphXGraph computation — social networks. subgraph, mapVertices operators.

Tight integration benefit: improving Spark Core automatically speeds up SQL, MLlib, streaming. One system instead of 5-10 separate ones.

Architecture — Draw This in Exam
EXAM DIAGRAMSpark Architecture
  +--------------------------------------------------+
  |              DRIVER PROGRAM                      |
  |  SparkContext  (creates RDDs, sends tasks)       |
  +---------------------------+----------------------+
                              |
              +---------------v------------------+
              |         CLUSTER MANAGER           |
              |   YARN  /  Mesos  /  Standalone   |
              +--------+----------+--------------+
                       |          |
            +----------v--+  +----v----------+
            | WORKER NODE |  | WORKER NODE   |
            | +----------+|  | +-----------+ |
            | | Executor ||  | | Executor  | |
            | | Task Task||  | | Task Task | |
            | | Cache    ||  | | Cache     | |
            | +----------+|  | +-----------+ |
            +-------------+  +---------------+
  • Driver — contains SparkContext, runs user's main(), creates task DAG
  • SparkContext — entry point, connects to cluster manager
  • Cluster Manager — allocates resources (supports YARN, Mesos, Standalone)
  • Worker Node — physical/virtual machine running executors
  • Executor — JVM process that runs tasks and caches data for the application
Storage Layers

Spark can read from: HDFS  Local filesystem  Amazon S3  Cassandra  HBase  Hive

Spark does NOT require Hadoop — it just supports systems implementing Hadoop APIs. Supports text files, SequenceFiles, Avro, Parquet, any Hadoop InputFormat.

Spark vs Hadoop
FeatureApache SparkHadoop MapReduce
Speed100× faster (in-memory)Slower (disk I/O)
ProcessingReal-time + BatchBatch only
DifficultyEasy (high-level APIs)Complex
RecoveryPartition recovery via lineageFull fault-tolerance (replication)
InteractivityInteractive shell (REPL)No (except Pig/Hive)
LanguageScala, Python, Java, RJava primarily

RDDs — Resilient Distributed Datasets

Creating · transformations · actions · lazy evaluation · persistence

What is an RDD?
  • RDD = Resilient Distributed Dataset
  • An immutable, distributed collection of objects
  • Split into partitions across cluster nodes
  • Two types of operations: Transformations (lazy, return RDD) and Actions (eager, return value)
  • Spark tracks lineage — DAG of dependencies — for fault recovery
Creating RDDs
// From file (most common)
val lines = sc.textFile("README.md")

// From in-memory collection
val data = sc.parallelize(
  List("hello world", "hi")
)

// From another RDD (transformation)
val words = data.flatMap(_.split(" "))
Lazy Evaluation
val rdd = sc.textFile("file.txt")
// NOTHING runs yet — Spark records plan

val errors = rdd.filter(_.contains("error"))
// STILL nothing runs

errors.count()
// NOW Spark executes the full plan!

Transformations just build a DAG. Actions trigger execution.

⚠️Key rule: If return type is RDD → Transformation (lazy). If return type is a value/Unit → Action (triggers execution). Transformations can be chained; an Action at the end executes all of them.
Common Transformations
FunctionResult on {1,2,3,3}
map(x => x+1){2,3,4,4}
filter(x => x!=1){2,3,3}
flatMap(x => x.to(3)){1,2,3,2,3,3,3}
distinct(){1,2,3}
union(other {3,4,5}){1,2,3,3,4,5}
intersection(other){3}
subtract(other){1,2}
Common Actions
ActionResult on {1,2,3,3}
collect()[1,2,3,3]
count()4
first()1
take(2)[1,2]
top(2)[3,3]
reduce(_ + _)9
foreach(println)prints each
map() vs flatMap()
// map: one element in, one element out
val sq = sc.parallelize(List(1,2,3,4)).map(x => x*x)
// {1, 4, 9, 16}

// flatMap: one in, ZERO or MORE out (then flatten)
val words = sc.parallelize(List("hello world", "hi"))
              .flatMap(_.split(" "))
// {"hello", "world", "hi"}
// map would give: {["hello","world"], ["hi"]}
Persistence / Caching
// Default: RDD recomputed on every action
// Use persist() to cache:
val result = input.map(x => x*x)
result.persist()   // or .cache()
result.count()     // compute + cache
result.collect()   // uses cached data

Storage levels: MEMORY_ONLY · MEMORY_AND_DISK · DISK_ONLY · MEMORY_ONLY_SER

reduce() vs fold() vs aggregate()

The most-asked Unit 5 question — appears in EVERY paper

🔥The constraint: reduce() and fold() — return type MUST match RDD element type. aggregate() breaks this — can return ANY different type. This distinction IS the question.

reduce(f)

No zero value. Return type = RDD element type. Apply binary function repeatedly.

rdd.reduce((x,y) => x + y)
rdd.reduce(_ min _)
rdd.reduce(_ max _)

fold(zero)(f)

HAS zero value (identity element). Return type = RDD element type. Zero applied to each partition first.

// 0 is identity for +
rdd.fold(0)((acc,x) => acc+x)
// 1 is identity for *
rdd.fold(1)(_ * _)

aggregate(zero)(seqOp, combOp)

BREAKS the constraint — return type can be ANYTHING. Needs two functions: seqOp and combOp.

rdd.aggregate((0,0))(
  (acc,v) => (acc._1+v, acc._2+1),
  (a,b) => (a._1+b._1, a._2+b._2)
)
// Returns (sum, count) tuple!
KEY CONCEPTThe Constraint & How aggregate() Solves It
  • Constraint of reduce() and fold(): Return type MUST be same as RDD element type. RDD[Int] → must return Int.
  • Problem: computing an average needs (sum, count) as a tuple — a different type than Int.
  • aggregate() solves this: supply zeroValue of the desired return type, seqOp to combine element with accumulator, combOp to merge partition results.
PYQ x3Complete Example — all three
val listRdd = sc.parallelize(List(1,2,3,4,5))

// reduce() — no zero, same return type
println(listRdd.reduce(_ + _))   // 15
println(listRdd.reduce(_ min _)) // 1
println(listRdd.reduce(_ max _)) // 5

// fold() — has zero value, same return type
println(listRdd.fold(0)(_ + _))  // 15 (identity for + is 0)
println(listRdd.fold(1)(_ * _))  // 120 (identity for * is 1)

// aggregate() — DIFFERENT return type (tuple for average)
val result = listRdd.aggregate((0, 0))(
  (acc, v) => (acc._1 + v, acc._2 + 1),     // seqOp
  (a, b)   => (a._1 + b._1, a._2 + b._2)      // combOp
)
val avg = result._1 / result._2.toDouble
println(s"Sum=${result._1}, Count=${result._2}, Avg=$avg")
// Sum=15, Count=5, Avg=3.0

// Sum of RDD using aggregate (simpler example)
val listRdd2 = sc.parallelize(List(1,2,3,4,5,3,2))
def param0 = (accu: Int, v: Int) => accu + v
def param1 = (accu1: Int, accu2: Int) => accu1 + accu2
val sum = listRdd2.aggregate(0)(param0, param1)
// Output: 20
Summary Table
FunctionZero value?Return type constraintUse when
reduce(f)NoSame as RDD elementsSimple sum, min, max
fold(zero)(f)YesSame as RDD elementsNeed identity value
aggregate(zero)(seq,comb)YesANY type!Need different return type
reduceByKey(f)NoSame as value typePair RDD — group by key

Spark SQL

HiveContext · SQLContext · SchemaRDD · loading data · UDFs

What is Spark SQL?
  • Spark's interface for structured and semi-structured data (data with a known schema)
  • 3 capabilities: load from JSON/Hive/Parquet · query with SQL/HQL · mix SQL with RDD code
  • Uses SchemaRDD — RDD of Row objects that knows its schema
  • Two entry points: HiveContext (Hive support) vs SQLContext (no Hive)
EXAM CODESpark SQL — Full Example (Scala)
import org.apache.spark.sql.hive.HiveContext

// 1. Create SparkContext, then HiveContext
val sc = new SparkContext(...)
val hiveCtx = new HiveContext(sc)

// 2. Load JSON data
val input = hiveCtx.jsonFile(inputFile)

// 3. Register as temporary table
input.registerTempTable("tweets")

// 4. Query with SQL
val topTweets = hiveCtx.sql(
  "SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10"
)

// 5. Access results
topTweets.collect().foreach(println)

// Access by column index
val topTweetText = topTweets.map(row => row.getString(0))
SchemaRDD
  • RDD of Row objects, each = one record
  • Knows its schema (column names and types)
  • Can be used as regular RDD (map, filter)
  • Register as temp table for SQL: rdd.registerTempTable("name")
  • Row access: row.getString(0), row.getInt(1)
Caching in Spark SQL
// Use cacheTable — columnar format (more efficient)
hiveCtx.cacheTable("tableName")

// NOT the same as RDD.persist()
// Stores as in-memory COLUMNAR format
// Lost when driver exits
Loading Data
// JSON file
val df = hiveCtx.jsonFile("data.json")

// Hive table
val rows = hiveCtx.sql("SELECT key, value FROM mytable")
val keys = rows.map(row => row.getInt(0))

// From RDD via case class (Scala)
case class HappyPerson(name: String, drink: String)
val rdd = sc.parallelize(List(HappyPerson("Alice", "coffee")))
rdd.registerTempTable("happy_people")
hiveCtx.sql("SELECT name FROM happy_people").collect()
UDFs — User Defined Functions
// Register a Scala function as SQL UDF
hiveCtx.registerFunction("strLenScala", (_: String).length)
val r = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10")

// In Python
hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType())

All Must-Know Programs — PYQ

Write each from memory. These are the ones that appear verbatim.

⚠️Programs 1, 2, 3, 4 appeared in 3+ papers. Know them cold.
PYQ x4P1 — Spark WordCount
val data = sc.textFile("sparkdata.txt")

val splitdata = data.flatMap(line => line.split(" "))
// lines → individual words

val mapdata = splitdata.map(word => (word, 1))
// words → (word, 1) pairs

val reducedata = mapdata.reduceByKey(_ + _)
// ("hello",1),("hello",1) → ("hello",2)

reducedata.collect().foreach(println)
PYQ x3P2 — ArrayBuffer Fruits (write verbatim)
import scala.collection.mutable.ArrayBuffer
val fruits = ArrayBuffer("Apple", "Banana", "Mango", "Orange")
fruits ++= Array("Strawberry", "Pineapple")
val sorted = fruits.sortWith(_ > _)
sorted.trimEnd(2)
val arr = sorted.toArray
println(arr.mkString(", "))
PYQ x3P3 — reduce / fold / aggregate with constraint
val rdd = sc.parallelize(List(1,2,3,4,5))

// reduce — same return type, no zero
println(rdd.reduce(_ + _))   // 15
println(rdd.reduce(_ min _)) // 1

// fold — same return type, has zero value
println(rdd.fold(0)(_ + _))  // 15
println(rdd.fold(1)(_ * _))  // 120

// aggregate — DIFFERENT return type (compute average)
val result = rdd.aggregate((0, 0))(
  (acc, v) => (acc._1 + v, acc._2 + 1),    // seqOp
  (a, b)   => (a._1 + b._1, a._2 + b._2)     // combOp
)
val avg = result._1 / result._2.toDouble
println(s"Avg = $avg")  // Avg = 3.0
PYQ x2P4 — Mutable Map with student scores
// Immutable Map
val immScores = Map("Alice" -> 80, "Bob" -> 75, "Cindy" -> 90)

// Mutable copy
val mutScores = scala.collection.mutable.Map(immScores.toSeq: _*)
mutScores("Dave") = 85    // add new
mutScores("Bob") = 88     // update

// Add bonus +5 to all
val withBonus = mutScores.map { case (k, v) => (k, v + 5) }

// Filter > 50
withBonus.filter(_._2 > 50).foreach { case (n, s) => println(s"$n: $s") }
PYQP5 — Exception handling (throw + try)
object ExceptionDemo {
  def divide(a: Int, b: Int): Int = {
    if (b == 0) throw new ArithmeticException("Division by zero!")
    a / b
  }
  def main(args: Array[String]): Unit = {
    try {
      println(divide(10, 2))  // 5
      println(divide(10, 0))  // throws!
    } catch {
      case e: ArithmeticException => println("Error: " + e.getMessage)
      case e: Exception            => println("Unknown error")
    } finally {
      println("Finally always runs")
    }
  }
}
PYQP6 — Words starting with 'a', ascending count
val rdd = sc.textFile("test.txt")
val rdd2 = rdd.flatMap(f => f.split(" "))
val rdd3 = rdd2.map(m => (m, 1))
val rdd4 = rdd3.filter(a => a._1.startsWith("a"))
val rdd5 = rdd4.reduceByKey(_ + _)
val rdd6 = rdd5.map(a => (a._2, a._1)).sortByKey()
rdd6.foreach(println)
PYQP7 — Generic search function
def searchItem[T](collection: Seq[T], item: T): Boolean = {
  collection.contains(item)
}

// Linear search returning index
def linearSearch(arr: Array[Int], target: Int): Int = {
  for (i <- arr.indices) {
    if (arr(i) == target) return i
  }
  -1
}

println(searchItem(Seq(1,2,3), 3))  // true
println(linearSearch(Array(101,205,309), 205)) // 1
PYQP8 — Odd / Even 1 to 100
object OddEven {
  def printOdd(): Unit = for(i <- 1 to 100 if i%2!=0) print(s"$i ")
  def printEven(): Unit = for(i <- 1 to 100 if i%2==0) print(s"$i ")
  def main(args: Array[String]): Unit = {
    println("Odd:"); printOdd()
    println("\nEven:"); printEven()
  }
}
PYQP9 — Sum of comma-separated numbers (Spark)
val nums = sc.textFile("numbers.txt")
              .flatMap(_.split(","))
              .map(_.trim.toInt)
println("Sum = " + nums.reduce(_ + _))
PYQP10 — Lazy val vs val vs def demo
// val — evaluated at definition time
val a = { println("val evaluated"); 42 }
// prints "val evaluated" IMMEDIATELY

// lazy val — evaluated first time accessed
lazy val b = { println("lazy val evaluated"); 42 }
// nothing printed yet
println(b)
// NOW prints "lazy val evaluated", then 42
println(b)
// prints just 42 (already evaluated and cached)

// def — evaluated every time called
def c = { println("def evaluated"); 42 }
println(c)  // prints "def evaluated", 42
println(c)  // prints "def evaluated", 42 (again!)