Question
- How is the number of partitions decided by Spark?
- Do I need to specify the number of available CPU cores somewhere explicitly so that the number of partitions will be the same (such as numPartition arg of parallelize method, but then need to update program whenever the number of cores changed)?
Background
Installed a Spark cluster as in Environment with no changes to the spark-env.sh, spark-defaults.conf files nor SparkConf object in programs.
For a N Queen program, the number of partition was 2 and only one node was assigned tasks. For a word count program, the number of partition was 22 and tasks were allocated to all nodes. Used spark-submit for both programs.
Programs
N Queen
val sparkConf = new SparkConf().setAppName("NQueen").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
def isSafe(column: Int, placement: List[Int]): Boolean = { ... }
def placeQueensAt(row: Int, placements: Set[List[Int]]): Set[List[Int]] = { ... }
val initial = sc.parallelize(queensAtFirst)
//val initial = sc.parallelize(queensAtFirst, 12)
println("Partitions = %d".format(initial.partitions.size))
val result = initial.flatMap(x => placeQueensAt(1, Set(x))).collect()
Word Count
val sparkConf = new SparkConf().setAppName("WordCount").set("spark.files.overwrite", "true")
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val lines = sc.textFile("hdfs:/user/wynadmin/sfpd.csv")
println("Patitions = %d".format(lines.partitions.size))
val words = for (line <- lines; word <- line.split(",") if word.toLowerCase.matches("[a-z]+")) yield (word, 1)
val counts = words.reduceByKey(_ + _)
Environment
Spark 2.0.1 (3 nodes with 4 CPU each) on Ubuntu 14.04.
Standalone deployment (not YARN nor Mesos)
Found the information in How-to: Tune Your Apache Spark Jobs (Part 2).
spark.default.parallelism option fixed the symptom.
Setting to 12 (same with the number of cores) lead to uneven usage of nodes.