Spark Course Stackoverflow week 2 3

So as you already know, I’ve enrolled to Functional Programming in Scala Specialization on Coursera. Currently I’m passing through the 4th course of the series “Big Data Analysis with Scala and Spark”. If to be more precise, two days ago I completed an assignment for weeks #2 and #3. As usually I’m going to share my experience.

By the way, if you are interested in my impressions of week #1 of Spark course, you can freely read it latter.

General impressions

Week #2 is dedicated to pair RDDs. It’s totally about this separate world of data. Because comparing to regular RDDs you get a set of powerful functions which you can apply to exactly to pair RDDs.

Wait a moment! Do you understand what is a pair RDD? It’s something like a map data structure (dictionary, properties etc). So in general when you have a data in a format of key-value pairs, you have a pair RDD in context of Spark.

So why pair RDDs are so special in Spark? Actually, because they give a more easier way to operate with data. For example when you want to group or aggregate a data based on some of its properties. For this purposes, Spark has a special set of functions such as: groupByKey, aggregateByKey, reduceByKey etc. Separately I want to underline join operations. Pair RDDs allows you to apply SQL-like joins.

Small example. You have an RDD of persons. You want to group these persons by their age. In order to do this with Spark, you need (A) to create a pair RDD, where a role of key will play an age field, a role of value will play an appropriate person, (B) apply groupByKey function to the pair RDD.

If you are attentive and curious student, you would definitely learn all this from video lectures. All topics are explained pretty well.

Challenges

So in the end of week #2 I met an assignment about Stackoverflow dataset. From the first seconds, when I started reading it, I was terrified. The assignment is about k-means algorithm.

I thought: Again! Like in previous courses about Scala! K-means hell!

I was needed to implement some of functions which were required for a correct work of k-means algorithm.

The second thing which confused me, was absence of unit tests. So I was not able to check the code I wrote.

So I started from writing some unit tests. Step by step I implemented functions…

  val testConf = new SparkConf().setMaster("local").setAppName("StackOverflowTest")
  val sc: SparkContext = new SparkContext(testConf)

  test("Grouping questions and answers together") {
    val postings = List(
      Posting(1, 1, None, None, 0, None),
      Posting(1, 2, None, None, 0, None),
      Posting(2, 3, None, Some(1), 2, None),
      Posting(2, 4, None, Some(1), 5, None),
      Posting(2, 5, None, Some(2), 12, None),
      Posting(1, 6, None, None, 0, None)
    )
    val rdd = sc.makeRDD(postings)
    val results = testObject.groupedPostings(rdd).collect()

    assert(results.size === 2)
    assert(results.contains(
      (1, Iterable(
        (Posting(1, 1, None, None, 0, None), Posting(2, 3, None, Some(1), 2, None)),
        (Posting(1, 1, None, None, 0, None), Posting(2, 4, None, Some(1), 5, None))
      ))
    ))
    assert(results.contains(
      (2, Iterable(
        (Posting(1, 2, None, None, 0, None), Posting(2, 5, None, Some(2), 12, None))
      ))
    ))
  }

  test("Maximum scores among answers per question") {
    val groupedQuestions = Seq(
      (1, Iterable(
        (Posting(1, 1, None, None, 0, None), Posting(2, 3, None, Some(1), 2, None)),
        (Posting(1, 1, None, None, 0, None), Posting(2, 4, None, Some(1), 5, None))
      )),
      (2, Iterable(
        (Posting(1, 2, None, None, 0, None), Posting(2, 5, None, Some(2), 12, None))
      )),
      (3, Iterable(
        (Posting(1, 6, None, None, 0, None), Posting(2, 7, None, Some(3), 2, None)),
        (Posting(1, 6, None, None, 0, None), Posting(2, 8, None, Some(3), 19, None)),
        (Posting(1, 6, None, None, 0, None), Posting(2, 9, None, Some(3), 10, None))
      ))
    )
    val rdd = sc.makeRDD(groupedQuestions)
    val results = testObject.scoredPostings(rdd).collect()

    assert(results.size == 3)
    assert(results.contains( (Posting(1, 1, None, None, 0, None), 5) ))
    assert(results.contains( (Posting(1, 2, None, None, 0, None), 12) ))
    assert(results.contains( (Posting(1, 6, None, None, 0, None), 19) ))
  }

  test("Vectors for clustering") {
    val questionsWithTopAnswer = List(
      (Posting(1, 1, None, None, 0, Some("Java")), 14),
      (Posting(1, 1, None, None, 0, None), 5),
      (Posting(1, 1, None, None, 0, Some("Scala")), 25),
      (Posting(1, 1, None, None, 0, Some("JavaScript")), 3)
    )

    val rdd = sc.makeRDD(questionsWithTopAnswer)

    val result = testObject.vectorPostings(rdd).collect()
    assert (result === Array((50000, 14), (500000, 25), (0, 3)))
  }

  //Use this test only when you want to check how fast your implementations are
  test("Speed test of Grouped Postings + Scored Postings") {
    val resourcesPath = getClass.getResource("/stackoverflow/stackoverflow.csv")
    println(resourcesPath)
    val raw     = testObject.rawPostings(sc.textFile(resourcesPath.toString))
    val grouped = testObject.groupedPostings(raw)
    val scored  = testObject.scoredPostings(grouped)

    assert(grouped.collect().size === 2121822)

    scored.collect().take(10).foreach(println)
  }

These tests are good just for general verifications. Because in context of the algorithm efficiency, they are not so useful.

One interesting fact, I submitted 29 solutions before I achieved 10 out of 10. Intermediate results were 0, 3 and 4. The main problem was a poor performance.

I read a lot of threads on discussions forum. It was really helpful. There were a lot of tips from other students and moderators. And there I understood that the assignment requires knowledge from the week #3 video lectures.

Without knowledge about Spark shuffling internals and partitioning strategies there were no chances for me to complete the task. So I dived into the video lectures from week #3 and finally completed the k-means algorithm.

So my main advise is be attentive!

Solutions

In this section, I want to share some tips which can be useful in context of solving the assignments. Since many people tried to steal the real solutions, I decided to modify them a little.

Let’s assume that we are working with RDD of entities from Quora service (question-answer platform). All questions and answers are represented as QEntity:

case class QEntity(type: Int, id: Int, parentId: Option[Int], score: Int, tags: Option[String])

In order to groupe all questions with related to them answers we need to perform something like this:

def groupedEntities(entities: RDD[QEntity]): RDD[(Int, Iterable[(QEntity, QEntity)])] = {
  val cachedEntities = entities.cache()

  val questionsMap = cachedEntities.filter(entity => entity.postingType == 1)
    .map(q => (q.id, q))
  val answersMap = cachedEntities.filter(entity => entity.postingType == 2)
    .map(a => (a.parentId.get, a))

  val tuning = new RangePartitioner(8, questionsMap)

  val tunedQuestions = questionsMap.partitionBy(tuning).persist()
  val tunedAnswers = answersMap.partitionBy(tuning).persist()

  tunedQuestions.join(tunedAnswers)
    .partitionBy(tuning)
    .persist()
    .groupByKey()
}

When we want to find the most higher rated answer for each question we do something like this:

val groupedEntities: RDD[(Int, Iterable[(QEntity, QEntity)])] = groupedEntities()
groupedEntities.values
  .map(iterable => (iterable.head._1, iterable.map(_._2)))
  .map(tuple => (tuple._1, findTheHighestScore(tuple._2.toArray)))

Vectors creation form the questions and their higher rated answers.

val cachedScored = scored.cache()
cachedScored.map(tuple => (firstWordInTag(tuple._1.tags, words), tuple._2))
  .filter(tuple => tuple._1.nonEmpty)
  .map(tuple => (tuple._1.get * wordSpread, tuple._2))

K-means algorithm.

val cachedVectors = vectors.cache()

val newMeans = means.clone()
cachedVectors.map(point => (findClosest(point, newMeans), point))
  .groupByKey()
  .mapValues(averageVectors(_))
  .collect()
  .foreach{
    case (index, newPoint) => newMeans.update(index, newPoint)
  }

And finally the median computation.

val median = closestGrouped.mapValues { vs =>
  val langLabel: String   = langs(vs.groupBy(pair => pair._1)
    .map(pair => (pair._1, pair._2.size))
    .maxBy(pair => pair._1)._1 / langSpread)

  val langPercent: Double = vs.map(_._1).filter(_ == langs.indexOf(langLabel)*langSpread).size * 100 /vs.size
  // percent of the questions in the most common language
  val clusterSize: Int    = vs.size
  val medianScore: Int    = medianCalculator(vs)

  (langLabel, langPercent, clusterSize, medianScore)
}

Summary

Within 2 days of my attempts to complete the assignment, I learned a lot of interesting things about Spark. Definitely more that was covered in the video lectures. Probably the most useful knowledge is about Spark UI. So when you are starting Spark code execution in the IDE, you are able to see in the logs URL to UI. And there placed a lot of interesting and useful data about your jobs, their status etc.

About The Author

Mathematician, programmer, wrestler, last action hero... Java / Scala architect, trainer, entrepreneur, author of this blog

  • Pingback: Big Data Analysis with Scala and Spark week 1: Wikipedia()

  • Gadek

    your solutions dosent work on coursera grader, how is that ?

    • If so, how I got 10 out of 10? 😀

    • If so, how I got 10 out of 10? 😀
      Moreover I warn the readers not to copy and paste the solution

      • Gadek

        you are caching a lot, event if it has no sens. caching like that must end up with OOM exception.

  • Gulshan

    Where is the medianCalculator Function in your code?

    • It’s hidden 🙂

      • Gulshan

        can you provide that function? 🙂

      • Beyhan Gül

        wtf ? why that’s hidden. Trash that code without that

        • alpoza

          There is one easy clever implementation in course forums:

          def computeMedian(s: Seq[Int]) = {
          val (lower, upper) = s.sortWith(_<_).splitAt(s.size / 2)
          if (s.size % 2 == 0) (lower.last + upper.head) / 2 else upper.head
          }

  • Volodymyr Miz

    Thanks for the tests, Alex.

  • alpoza

    Hi, thanks for the hints 😉

    I think your maps are broking your partitions. You are catching so much. I think you only need to catch the vector used in the kmeans recursive function.

Close