Using Spark and Hive to process BigData at Conviva

Dilip Joseph - 

Conviva monitors and optimizes tens of millions of online video streams daily for premium video brands. Through Conviva Pulse, our online video dashboard, customers analyze how their online video is being consumed. For example, customers can in real-time identify the most popular videos being watched and adjust their advertising strategy. As another example, customer ops teams can in real-time detect problems degrading the experience of users watching a live basketball game (say, high buffering due to an overloaded CDN) and quickly take corrective action before the game ends. In addition to live monitoring, our customers can also analyze historical video trends – what were the most popular videos last week and how long were they watched on average?

Our customers also ask questions that require very deep and often ad-hoc analysis – “Something seems to be wrong with my video delivery last week. Any idea what is going on?”  The video analysis team at Conviva digs through terabytes of data to provide detailed responses to such questions. This post describes how we use Hive and Spark to make this happen.

First, here is an overview of how data flows through the Conviva infrastructure. Conviva monitoring and optimization code embedded within our customers’ video players collects non-personally identifiable information like buffering time and frame rate, and sends it to Conviva’s backend infrastructure every few seconds. The data entering our backend is replicated into two streams. One stream goes into our custom live data processing stack written in Java. This live system solves the challenging task of analyzing multiple millions of concurrent video streams with a 1 second latency, and deserves its own separate blog post. The other stream is written into our Hadoop File System (HDFS) cluster. MapReduce jobs summarize this raw data and extract out the historical trending information shown in Pulse. The raw data written to HDFS, together with the summaries calculated by the MapReduce jobs, constitute the input to our Hive and Spark based offline reporting and ad-hoc analysis infrastructure, which is described next.

 

We first started with Hive. Hive provides a SQL-like interface on top of data stored in HDFS. For example, we can find the number of sessions per video on a particular day with the following Hive query:


SELECT videoName, COUNT(1)
FROM summaries
WHERE date='2011_12_12' AND customer='XYZ'
GROUP BY videoName;

Our analysis team uses Hive queries to dig deep into the data and answer customer questions. This process often involves tens of ad-hoc queries, most of them filling up multiple A4-sheets of paper. The most commonly used queries are packaged up together into reports on our internal analysis web-portal so that engineers and customer service representatives can easily run them. Our analysis team also uses Hive queries to measure and improve the performance of our video optimization algorithms, and to gain value insights into the online video ecosystem and user behavior. We have published some of our findings in the research paper titled “Understanding the Impact of Video Quality on User Engagement”, published at SIGCOMM 2011.

Hive works great for performing ad-hoc queries against large amounts of data stored in HDFS. The only problem is that queries often take a long time, as they must read data from HDFS. Most of the time, we run multiple queries against the same data set. Hive re-reads the data from HDFS each time.  This is highly wasteful and slow. This is where Spark helps.

Spark is an in-memory distributed computing framework being developed at UC Berkeley. Spark allows us to load the data of interest from HDFS (or any persistent storage) into RAM across multiple servers and cache it. We can then perform multiple queries against the cached data. Since the data is in RAM, queries are super quick. If a node dies, Spark automatically reconstructs the data from persistent storage.

Queries in Spark are not SQL-like queries as in Hive. Instead, they are written in Scala. The following Spark query produces the same results as the video count Hive query we saw earlier.


val sessions = sparkContext.sequenceFile[SessionSummary, NullWritable](pathToSessionSummaryOnHdfs, 
                   classOf[SessionSummary], classOf[NullWritable]).flatMap {
                       case (key, val) => val.fieldsOfInterest
               }
val cachedSessions = sessions.filter(whereConditionToFilterSessionsForTheDesiredDay).cache
val mapFn : SessionSummary => (String, Long) = { s => (s.videoName, 1) }
val reduceFn : (Long, Long) => Long = { (a,b) => a+b }
val results = sessionsForQuery.map(mapFn).reduceByKey(reduceFn).collectAsMap

The above Scala code first reads the session summaries from HDFS. We only keep the subset of fields that are necessary for the query in order to conserve memory. We then filter out sessions that do not match the day and customer of interest. We now have our working set, and instruct Spark to cache it. After that, we start up our group by video name query which is defined as a map-reduce job.  At this point, Spark reads the data from HDFS, caches it, executes the map-reduce job and produces the results.

If we run just one query, Spark is not much faster than Hive. Reading the data from HDFS is the bottleneck in both cases. Most of the time, however, we have to issue multiple queries on the same data – say, we want to find the video counts per country, state and city in addition to the video name.  In that scenario, Spark is much faster than Hive. Since the data is already cached in memory, subsequent queries complete very quickly. One of our reports which produces detailed video data analysis for a geographic region took 24 hours to run using Hive. After converting to Spark, the same report runs in under 45 minutes. The tremendous savings produced by Spark have helped us run more reports than we could have with our existing Hadoop cluster. Today, over 30% of the reports we run on a daily basis use Spark.

For simple queries, writing the query in Spark is harder than writing it in Hive. However, it is much easier to write complicated queries in Spark than in Hive. Most real-world queries tend to be quite complex; hence the benefit of Spark. We can leverage the full power of the Scala programming language, rather than relying on the limited syntax offered by SQL. For example, the Hive expression IF(os=1, “Windows”, IF(os=2, “OSX”, IF(os=3, “Linux”, “Unknown”))) can be replaced by a simple match clause in Scala. You can also use any Java/Scala library to transform the data. Moreover, an ongoing project at UC Berkeley called Shark, will soon make it possible to run Hive queries on top of Spark.

Overall, Spark has been a great boon for data processing operations at Conviva. We are currently working on leveraging Spark in many more ways, including real-time analysis and anomaly detection. If you are interested in working on large-scale data processing systems,we are hiring.facebooktwittergoogle_pluslinkedinmailfacebooktwittergoogle_pluslinkedinmail