val · var · lazy val · def · if/else · for loops · functions · exceptions
val x = expensive() // evaluated IMMEDIATELY, stored, IMMUTABLE var x = expensive() // evaluated IMMEDIATELY, stored, MUTABLE lazy val x = expensive() // evaluated FIRST TIME accessed, then stored def x = expensive() // evaluated EVERY TIME called, never stored
| Keyword | When evaluated | Stored? | Mutable? |
|---|---|---|---|
| val | At definition time | Yes | No (immutable) |
| var | At definition time | Yes | Yes |
| lazy val | First access only | Yes (after 1st use) | No |
| def | Every single call | Never | N/A |
lazy val = halfway between val (evaluates once) and def (evaluates every time)
val answer = 8 * 5 + 2 // Int, inferred val s: String = "Hello" // explicit type var counter = 0 counter = 1 // OK for var // answer = 0 // ERROR: val is immutable val xmax, ymax = 100 // both = 100
// In Scala, if/else HAS a value val s = if (x > 0) 1 else -1 // same as Java's: x > 0 ? 1 : -1 // Missing else = Unit (like void) if (x > 0) 1 // equivalent to: if(x>0) 1 else ()
// Basic range for (i <- 1 to 5) println(i) // 1 2 3 4 5 (to = inclusive) for (i <- 1 until 5) println(i) // 1 2 3 4 (until = excludes last) for (i <- 10 to 0 by -1) println(i) // countdown 10..0 // Multiple generators (nested loop effect) for (i <- 1 to 3; j <- 1 to 3) print(f"${10*i+j}%3d") // Output: 11 12 13 21 22 23 31 32 33 // Guard — filter condition (NO semicolon before if!) for (i <- 1 to 3; j <- 1 to 3 if i != j) print(f"${10*i+j}%3d") // Output: 12 13 21 23 31 32 // With definition inside loop for (i <- 1 to 3; from = 4 - i; j <- from to 3) print(f"${10*i+j}%3d") // Output: 13 22 23 31 32 33 // yield — builds a collection (for comprehension) val result = for (i <- 1 to 10) yield i % 3 // Vector(1, 2, 0, 1, 2, 0, 1, 2, 0, 1)
// Basic — return type inferred (not needed unless recursive) def abs(x: Double) = if (x >= 0) x else -x // Multi-line — last expression is return value def fac(n: Int) = { var r = 1 for (i <- 1 to n) r = r * i r // returned — no return keyword needed } // Recursive — MUST specify return type def fac(n: Int): Int = if (n <= 0) 1 else n * fac(n - 1) // Default arguments def decorate(str: String, left: String = "[", right: String = "]") = left + str + right decorate("Hello") // "[Hello]" decorate("Hello", "<<<", ">>>") // "<<<Hello>>>" // Variable args def sum(args: Int*) = { var r = 0; for (a <- args) r += a; r } sum(1, 4, 9) // 14 sum(1 to 5: _*) // pass range as varargs // Procedure (returns Unit, no = sign) def printBox(s: String) { val border = "-" * (s.length + 2) println(s"$border\n|$s|\n$border") }
// throw — type is Nothing throw new IllegalArgumentException("x should not be negative") // try / catch / finally try { process(input) } catch { case ex: IllegalArgumentException => println("Bad arg: " + ex.getMessage) case ex: Exception => println("Other error") } finally { // ALWAYS runs — use for cleanup (close files etc) in.close() } // if/else with throw — type of expression is Double (other branch) if (x >= 0) { Math.sqrt(x) } else throw new IllegalArgumentException("x negative")
Fixed arrays · ArrayBuffer · mutable/immutable Maps · Tuples · zip
val nums = new Array[Int](10) // all zeros val a = new Array[String](10) // all null val s = Array("Hi", "Bye") // no 'new' with values s(0) // "Hi" — use () not []! // 2D val m = Array.ofDim[Double](3,4) m(0)(1) = 42.0 // Useful methods Array(1,7,2,9).sum // 19 Array(1,7,2,9).max // 9 Array(1,7,2,9).sorted // Array(1,2,7,9) a.mkString(", ") // "1, 7, 2, 9"
import scala.collection.mutable.ArrayBuffer val b = ArrayBuffer[Int]() b += 1 // add one b += (1, 2, 3) // add multiple b ++= Array(8, 13) // append collection b.trimEnd(2) // remove last 2 b.insert(2, 6) // insert at index 2 b.remove(2) // remove at index 2 b.remove(2, 3) // remove 3 from index 2 b.sortWith(_ > _) // sort descending b.sorted // sort ascending b.toArray // convert to Array
import scala.collection.mutable.ArrayBuffer // 1. Create with initial elements val fruits = ArrayBuffer("Apple", "Banana", "Mango", "Orange") // 2. Append Strawberry and Pineapple fruits ++= Array("Strawberry", "Pineapple") // ArrayBuffer(Apple, Banana, Mango, Orange, Strawberry, Pineapple) // 3. Sort descending val sorted = fruits.sortWith(_ > _) // ArrayBuffer(Strawberry, Pineapple, Orange, Mango, Banana, Apple) // 4. Remove last 2 sorted.trimEnd(2) // ArrayBuffer(Strawberry, Pineapple, Orange, Mango) // 5. Convert to Array val arr = sorted.toArray println(arr.mkString(", ")) // Output: Strawberry, Pineapple, Orange, Mango
// IMMUTABLE (default) — cannot change val scores = Map("Alice" -> 10, "Bob" -> 3, "Cindy" -> 8) // MUTABLE — add/update/remove keys val sc = scala.collection.mutable.Map("Alice" -> 10, "Bob" -> 3) sc("Bob") = 20 // update existing sc("Fred") = 7 // add new sc += ("Alice" -> 15, "Zara" -> 5) // add multiple sc -= "Alice" // remove key // Accessing val b = scores("Bob") // 3 (throws if not found) val b = scores.getOrElse("Bob", 0) // safe — returns 0 if missing scores.contains("Bob") // true/false // Iterating for ((k, v) <- scores) println(s"$k -> $v") scores.keySet // all keys for (v <- scores.values) println(v) // all values for ((k, v) <- scores) yield (v, k) // reverse map // Sorted map val sorted = scala.collection.mutable.SortedMap("Bob"->3, "Alice"->10)
// Create val t = (1, 3.14, "Fred") // type: (Int, Double, String) // Access — starts at _1 (NOT 0!) t._1 // 1 t._2 // 3.14 t._3 // "Fred" // partition() returns pair of strings "New York".partition(_.isUpper) // ("NY", "ew ork") // zip — bundle two arrays into pairs val symbols = Array("<", "-", ">") val counts = Array(2, 10, 2) val pairs = symbols.zip(counts) // Array(("<",2), ("-",10), (">",2)) for ((s, n) <- pairs) print(s * n) // <<---------->>
What is Spark · unified stack · architecture diagram · Spark vs Hadoop
| Component | Purpose |
|---|---|
| Spark Core | Task scheduling, memory management, fault recovery, RDD API. Foundation for everything. |
| Spark SQL | Structured data via SQL and HiveQL. JSON, Parquet, Hive sources. |
| Spark Streaming | Real-time stream processing (log files, message queues). Micro-batching. |
| MLlib | ML algorithms — classification, regression, clustering, collaborative filtering. |
| GraphX | Graph computation — social networks. subgraph, mapVertices operators. |
Tight integration benefit: improving Spark Core automatically speeds up SQL, MLlib, streaming. One system instead of 5-10 separate ones.
+--------------------------------------------------+
| DRIVER PROGRAM |
| SparkContext (creates RDDs, sends tasks) |
+---------------------------+----------------------+
|
+---------------v------------------+
| CLUSTER MANAGER |
| YARN / Mesos / Standalone |
+--------+----------+--------------+
| |
+----------v--+ +----v----------+
| WORKER NODE | | WORKER NODE |
| +----------+| | +-----------+ |
| | Executor || | | Executor | |
| | Task Task|| | | Task Task | |
| | Cache || | | Cache | |
| +----------+| | +-----------+ |
+-------------+ +---------------+
Spark can read from: HDFS Local filesystem Amazon S3 Cassandra HBase Hive
Spark does NOT require Hadoop — it just supports systems implementing Hadoop APIs. Supports text files, SequenceFiles, Avro, Parquet, any Hadoop InputFormat.
| Feature | Apache Spark | Hadoop MapReduce |
|---|---|---|
| Speed | 100× faster (in-memory) | Slower (disk I/O) |
| Processing | Real-time + Batch | Batch only |
| Difficulty | Easy (high-level APIs) | Complex |
| Recovery | Partition recovery via lineage | Full fault-tolerance (replication) |
| Interactivity | Interactive shell (REPL) | No (except Pig/Hive) |
| Language | Scala, Python, Java, R | Java primarily |
Creating · transformations · actions · lazy evaluation · persistence
// From file (most common) val lines = sc.textFile("README.md") // From in-memory collection val data = sc.parallelize( List("hello world", "hi") ) // From another RDD (transformation) val words = data.flatMap(_.split(" "))
val rdd = sc.textFile("file.txt") // NOTHING runs yet — Spark records plan val errors = rdd.filter(_.contains("error")) // STILL nothing runs errors.count() // NOW Spark executes the full plan!
Transformations just build a DAG. Actions trigger execution.
| Function | Result on {1,2,3,3} |
|---|---|
| map(x => x+1) | {2,3,4,4} |
| filter(x => x!=1) | {2,3,3} |
| flatMap(x => x.to(3)) | {1,2,3,2,3,3,3} |
| distinct() | {1,2,3} |
| union(other {3,4,5}) | {1,2,3,3,4,5} |
| intersection(other) | {3} |
| subtract(other) | {1,2} |
| Action | Result on {1,2,3,3} |
|---|---|
| collect() | [1,2,3,3] |
| count() | 4 |
| first() | 1 |
| take(2) | [1,2] |
| top(2) | [3,3] |
| reduce(_ + _) | 9 |
| foreach(println) | prints each |
// map: one element in, one element out val sq = sc.parallelize(List(1,2,3,4)).map(x => x*x) // {1, 4, 9, 16} // flatMap: one in, ZERO or MORE out (then flatten) val words = sc.parallelize(List("hello world", "hi")) .flatMap(_.split(" ")) // {"hello", "world", "hi"} // map would give: {["hello","world"], ["hi"]}
// Default: RDD recomputed on every action // Use persist() to cache: val result = input.map(x => x*x) result.persist() // or .cache() result.count() // compute + cache result.collect() // uses cached data
Storage levels: MEMORY_ONLY · MEMORY_AND_DISK · DISK_ONLY · MEMORY_ONLY_SER
The most-asked Unit 5 question — appears in EVERY paper
No zero value. Return type = RDD element type. Apply binary function repeatedly.
rdd.reduce((x,y) => x + y) rdd.reduce(_ min _) rdd.reduce(_ max _)
HAS zero value (identity element). Return type = RDD element type. Zero applied to each partition first.
// 0 is identity for + rdd.fold(0)((acc,x) => acc+x) // 1 is identity for * rdd.fold(1)(_ * _)
BREAKS the constraint — return type can be ANYTHING. Needs two functions: seqOp and combOp.
rdd.aggregate((0,0))(
(acc,v) => (acc._1+v, acc._2+1),
(a,b) => (a._1+b._1, a._2+b._2)
)
// Returns (sum, count) tuple!
val listRdd = sc.parallelize(List(1,2,3,4,5)) // reduce() — no zero, same return type println(listRdd.reduce(_ + _)) // 15 println(listRdd.reduce(_ min _)) // 1 println(listRdd.reduce(_ max _)) // 5 // fold() — has zero value, same return type println(listRdd.fold(0)(_ + _)) // 15 (identity for + is 0) println(listRdd.fold(1)(_ * _)) // 120 (identity for * is 1) // aggregate() — DIFFERENT return type (tuple for average) val result = listRdd.aggregate((0, 0))( (acc, v) => (acc._1 + v, acc._2 + 1), // seqOp (a, b) => (a._1 + b._1, a._2 + b._2) // combOp ) val avg = result._1 / result._2.toDouble println(s"Sum=${result._1}, Count=${result._2}, Avg=$avg") // Sum=15, Count=5, Avg=3.0 // Sum of RDD using aggregate (simpler example) val listRdd2 = sc.parallelize(List(1,2,3,4,5,3,2)) def param0 = (accu: Int, v: Int) => accu + v def param1 = (accu1: Int, accu2: Int) => accu1 + accu2 val sum = listRdd2.aggregate(0)(param0, param1) // Output: 20
| Function | Zero value? | Return type constraint | Use when |
|---|---|---|---|
| reduce(f) | No | Same as RDD elements | Simple sum, min, max |
| fold(zero)(f) | Yes | Same as RDD elements | Need identity value |
| aggregate(zero)(seq,comb) | Yes | ANY type! | Need different return type |
| reduceByKey(f) | No | Same as value type | Pair RDD — group by key |
HiveContext · SQLContext · SchemaRDD · loading data · UDFs
import org.apache.spark.sql.hive.HiveContext // 1. Create SparkContext, then HiveContext val sc = new SparkContext(...) val hiveCtx = new HiveContext(sc) // 2. Load JSON data val input = hiveCtx.jsonFile(inputFile) // 3. Register as temporary table input.registerTempTable("tweets") // 4. Query with SQL val topTweets = hiveCtx.sql( "SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10" ) // 5. Access results topTweets.collect().foreach(println) // Access by column index val topTweetText = topTweets.map(row => row.getString(0))
rdd.registerTempTable("name")row.getString(0), row.getInt(1)// Use cacheTable — columnar format (more efficient) hiveCtx.cacheTable("tableName") // NOT the same as RDD.persist() // Stores as in-memory COLUMNAR format // Lost when driver exits
// JSON file val df = hiveCtx.jsonFile("data.json") // Hive table val rows = hiveCtx.sql("SELECT key, value FROM mytable") val keys = rows.map(row => row.getInt(0)) // From RDD via case class (Scala) case class HappyPerson(name: String, drink: String) val rdd = sc.parallelize(List(HappyPerson("Alice", "coffee"))) rdd.registerTempTable("happy_people") hiveCtx.sql("SELECT name FROM happy_people").collect()
// Register a Scala function as SQL UDF hiveCtx.registerFunction("strLenScala", (_: String).length) val r = hiveCtx.sql("SELECT strLenScala('tweet') FROM tweets LIMIT 10") // In Python hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType())
Write each from memory. These are the ones that appear verbatim.
val data = sc.textFile("sparkdata.txt") val splitdata = data.flatMap(line => line.split(" ")) // lines → individual words val mapdata = splitdata.map(word => (word, 1)) // words → (word, 1) pairs val reducedata = mapdata.reduceByKey(_ + _) // ("hello",1),("hello",1) → ("hello",2) reducedata.collect().foreach(println)
import scala.collection.mutable.ArrayBuffer val fruits = ArrayBuffer("Apple", "Banana", "Mango", "Orange") fruits ++= Array("Strawberry", "Pineapple") val sorted = fruits.sortWith(_ > _) sorted.trimEnd(2) val arr = sorted.toArray println(arr.mkString(", "))
val rdd = sc.parallelize(List(1,2,3,4,5)) // reduce — same return type, no zero println(rdd.reduce(_ + _)) // 15 println(rdd.reduce(_ min _)) // 1 // fold — same return type, has zero value println(rdd.fold(0)(_ + _)) // 15 println(rdd.fold(1)(_ * _)) // 120 // aggregate — DIFFERENT return type (compute average) val result = rdd.aggregate((0, 0))( (acc, v) => (acc._1 + v, acc._2 + 1), // seqOp (a, b) => (a._1 + b._1, a._2 + b._2) // combOp ) val avg = result._1 / result._2.toDouble println(s"Avg = $avg") // Avg = 3.0
// Immutable Map val immScores = Map("Alice" -> 80, "Bob" -> 75, "Cindy" -> 90) // Mutable copy val mutScores = scala.collection.mutable.Map(immScores.toSeq: _*) mutScores("Dave") = 85 // add new mutScores("Bob") = 88 // update // Add bonus +5 to all val withBonus = mutScores.map { case (k, v) => (k, v + 5) } // Filter > 50 withBonus.filter(_._2 > 50).foreach { case (n, s) => println(s"$n: $s") }
object ExceptionDemo { def divide(a: Int, b: Int): Int = { if (b == 0) throw new ArithmeticException("Division by zero!") a / b } def main(args: Array[String]): Unit = { try { println(divide(10, 2)) // 5 println(divide(10, 0)) // throws! } catch { case e: ArithmeticException => println("Error: " + e.getMessage) case e: Exception => println("Unknown error") } finally { println("Finally always runs") } } }
val rdd = sc.textFile("test.txt") val rdd2 = rdd.flatMap(f => f.split(" ")) val rdd3 = rdd2.map(m => (m, 1)) val rdd4 = rdd3.filter(a => a._1.startsWith("a")) val rdd5 = rdd4.reduceByKey(_ + _) val rdd6 = rdd5.map(a => (a._2, a._1)).sortByKey() rdd6.foreach(println)
def searchItem[T](collection: Seq[T], item: T): Boolean = { collection.contains(item) } // Linear search returning index def linearSearch(arr: Array[Int], target: Int): Int = { for (i <- arr.indices) { if (arr(i) == target) return i } -1 } println(searchItem(Seq(1,2,3), 3)) // true println(linearSearch(Array(101,205,309), 205)) // 1
object OddEven { def printOdd(): Unit = for(i <- 1 to 100 if i%2!=0) print(s"$i ") def printEven(): Unit = for(i <- 1 to 100 if i%2==0) print(s"$i ") def main(args: Array[String]): Unit = { println("Odd:"); printOdd() println("\nEven:"); printEven() } }
val nums = sc.textFile("numbers.txt") .flatMap(_.split(",")) .map(_.trim.toInt) println("Sum = " + nums.reduce(_ + _))
// val — evaluated at definition time val a = { println("val evaluated"); 42 } // prints "val evaluated" IMMEDIATELY // lazy val — evaluated first time accessed lazy val b = { println("lazy val evaluated"); 42 } // nothing printed yet println(b) // NOW prints "lazy val evaluated", then 42 println(b) // prints just 42 (already evaluated and cached) // def — evaluated every time called def c = { println("def evaluated"); 42 } println(c) // prints "def evaluated", 42 println(c) // prints "def evaluated", 42 (again!)