Spark Course Stackoverflow week 2 3

So as you already know, I’ve enrolled to Functional Programming in Scala Specialization on Coursera. Currently I’m passing through the 4th course of the series “Big Data Analysis with Scala and Spark”. If to be more precise, two days ago I completed an assignment for weeks #2 and #3. As usually I’m going to share my experience.

By the way, if you are interested in my impressions of week #1 of Spark course, you can freely read it latter.

General impressions

Week #2 is dedicated to pair RDDs. It’s totally about this separate world of data. Because comparing to regular RDDs you get a set of powerful functions which you can apply to exactly to pair RDDs.

Wait a moment! Do you understand what is a pair RDD? It’s something like a map data structure (dictionary, properties etc). So in general when you have a data in a format of key-value pairs, you have a pair RDD in context of Spark.

So why pair RDDs are so special in Spark? Actually, because they give a more easier way to operate with data. For example when you want to group or aggregate a data based on some of its properties. For this purposes, Spark has a special set of functions such as: groupByKey, aggregateByKey, reduceByKey etc. Separately I want to underline join operations. Pair RDDs allows you to apply SQL-like joins.

Small example. You have an RDD of persons. You want to group these persons by their age. In order to do this with Spark, you need (A) to create a pair RDD, where a role of key will play an age field, a role of value will play an appropriate person, (B) apply groupByKey function to the pair RDD.

If you are attentive and curious student, you would definitely learn all this from video lectures. All topics are explained pretty well.

Challenges

So in the end of week #2 I met an assignment about Stackoverflow dataset. From the first seconds, when I started reading it, I was terrified. The assignment is about k-means algorithm.

I thought: Again! Like in previous courses about Scala! K-means hell!

I was needed to implement some of functions which were required for a correct work of k-means algorithm.

The second thing which confused me, was absence of unit tests. So I was not able to check the code I wrote.

So I started from writing some unit tests. Step by step I implemented functions…

  val testConf = new SparkConf().setMaster("local").setAppName("StackOverflowTest")
  val sc: SparkContext = new SparkContext(testConf)

  test("Grouping questions and answers together") {
    val postings = List(
      Posting(1, 1, None, None, 0, None),
      Posting(1, 2, None, None, 0, None),
      Posting(2, 3, None, Some(1), 2, None),
      Posting(2, 4, None, Some(1), 5, None),
      Posting(2, 5, None, Some(2), 12, None),
      Posting(1, 6, None, None, 0, None)
    )
    val rdd = sc.makeRDD(postings)
    val results = testObject.groupedPostings(rdd).collect()

    assert(results.size === 2)
    assert(results.contains(
      (1, Iterable(
        (Posting(1, 1, None, None, 0, None), Posting(2, 3, None, Some(1), 2, None)),
        (Posting(1, 1, None, None, 0, None), Posting(2, 4, None, Some(1), 5, None))
      ))
    ))
    assert(results.contains(
      (2, Iterable(
        (Posting(1, 2, None, None, 0, None), Posting(2, 5, None, Some(2), 12, None))
      ))
    ))
  }

  test("Maximum scores among answers per question") {
    val groupedQuestions = Seq(
      (1, Iterable(
        (Posting(1, 1, None, None, 0, None), Posting(2, 3, None, Some(1), 2, None)),
        (Posting(1, 1, None, None, 0, None), Posting(2, 4, None, Some(1), 5, None))
      )),
      (2, Iterable(
        (Posting(1, 2, None, None, 0, None), Posting(2, 5, None, Some(2), 12, None))
      )),
      (3, Iterable(
        (Posting(1, 6, None, None, 0, None), Posting(2, 7, None, Some(3), 2, None)),
        (Posting(1, 6, None, None, 0, None), Posting(2, 8, None, Some(3), 19, None)),
        (Posting(1, 6, None, None, 0, None), Posting(2, 9, None, Some(3), 10, None))
      ))
    )
    val rdd = sc.makeRDD(groupedQuestions)
    val results = testObject.scoredPostings(rdd).collect()

    assert(results.size == 3)
    assert(results.contains( (Posting(1, 1, None, None, 0, None), 5) ))
    assert(results.contains( (Posting(1, 2, None, None, 0, None), 12) ))
    assert(results.contains( (Posting(1, 6, None, None, 0, None), 19) ))
  }

  test("Vectors for clustering") {
    val questionsWithTopAnswer = List(
      (Posting(1, 1, None, None, 0, Some("Java")), 14),
      (Posting(1, 1, None, None, 0, None), 5),
      (Posting(1, 1, None, None, 0, Some("Scala")), 25),
      (Posting(1, 1, None, None, 0, Some("JavaScript")), 3)
    )

    val rdd = sc.makeRDD(questionsWithTopAnswer)

    val result = testObject.vectorPostings(rdd).collect()
    assert (result === Array((50000, 14), (500000, 25), (0, 3)))
  }

  //Use this test only when you want to check how fast your implementations are
  test("Speed test of Grouped Postings + Scored Postings") {
    val resourcesPath = getClass.getResource("/stackoverflow/stackoverflow.csv")
    println(resourcesPath)
    val raw     = testObject.rawPostings(sc.textFile(resourcesPath.toString))
    val grouped = testObject.groupedPostings(raw)
    val scored  = testObject.scoredPostings(grouped)

    assert(grouped.collect().size === 2121822)

    scored.collect().take(10).foreach(println)
  }

These tests are good just for general verifications. Because in context of the algorithm efficiency, they are not so useful.

One interesting fact, I submitted 29 solutions before I achieved 10 out of 10. Intermediate results were 0, 3 and 4. The main problem was a poor performance.

I read a lot of threads on discussions forum. It was really helpful. There were a lot of tips from other students and moderators. And there I understood that the assignment requires knowledge from the week #3 video lectures.

Without knowledge about Spark shuffling internals and partitioning strategies there were no chances for me to complete the task. So I dived into the video lectures from week #3 and finally completed the k-means algorithm.

So my main advise is be attentive!

Solutions

In this section, I want to share some tips which can be useful in context of solving the assignments. Since many people tried to steal the real solutions, I decided to modify them a little.

Let’s assume that we are working with RDD of entities from Quora service (question-answer platform). All questions and answers are represented as QEntity:

case class QEntity(type: Int, id: Int, parentId: Option[Int], score: Int, tags: Option[String])

In order to groupe all questions with related to them answers we need to perform something like this:

def groupedEntities(entities: RDD[QEntity]): RDD[(Int, Iterable[(QEntity, QEntity)])] = {
  val cachedEntities = entities.cache()

  val questionsMap = cachedEntities.filter(entity => entity.postingType == 1)
    .map(q => (q.id, q))
  val answersMap = cachedEntities.filter(entity => entity.postingType == 2)
    .map(a => (a.parentId.get, a))

  val tuning = new RangePartitioner(8, questionsMap)

  val tunedQuestions = questionsMap.partitionBy(tuning).persist()
  val tunedAnswers = answersMap.partitionBy(tuning).persist()

  tunedQuestions.join(tunedAnswers)
    .partitionBy(tuning)
    .persist()
    .groupByKey()
}

When we want to find the most higher rated answer for each question we do something like this:

val groupedEntities: RDD[(Int, Iterable[(QEntity, QEntity)])] = groupedEntities()
groupedEntities.values
  .map(iterable => (iterable.head._1, iterable.map(_._2)))
  .map(tuple => (tuple._1, findTheHighestScore(tuple._2.toArray)))

Vectors creation form the questions and their higher rated answers.

val cachedScored = scored.cache()
cachedScored.map(tuple => (firstWordInTag(tuple._1.tags, words), tuple._2))
  .filter(tuple => tuple._1.nonEmpty)
  .map(tuple => (tuple._1.get * wordSpread, tuple._2))

K-means algorithm.

val cachedVectors = vectors.cache()

val newMeans = means.clone()
cachedVectors.map(point => (findClosest(point, newMeans), point))
  .groupByKey()
  .mapValues(averageVectors(_))
  .collect()
  .foreach{
    case (index, newPoint) => newMeans.update(index, newPoint)
  }

And finally the median computation.

val median = closestGrouped.mapValues { vs =>
  val langLabel: String   = langs(vs.groupBy(pair => pair._1)
    .map(pair => (pair._1, pair._2.size))
    .maxBy(pair => pair._1)._1 / langSpread)

  val langPercent: Double = vs.map(_._1).filter(_ == langs.indexOf(langLabel)*langSpread).size * 100 /vs.size
  // percent of the questions in the most common language
  val clusterSize: Int    = vs.size
  val medianScore: Int    = medianCalculator(vs)

  (langLabel, langPercent, clusterSize, medianScore)
}

Summary

Within 2 days of my attempts to complete the assignment, I learned a lot of interesting things about Spark. Definitely more that was covered in the video lectures. Probably the most useful knowledge is about Spark UI. So when you are starting Spark code execution in the IDE, you are able to see in the logs URL to UI. And there placed a lot of interesting and useful data about your jobs, their status etc.

About The Author

Mathematician, programmer, wrestler, last action hero... Java / Scala architect, trainer, entrepreneur, author of this blog

Close