Smart Scaling of Unique-Based Metrics Using HyperLogLog


Here at Conviva, we empower top media publishers by providing them with rich real-time insights on their video streaming traffic. We compute different Quality of Experience (QoE) metrics like RebufferingRatio, AverageBitrate, VideoStartupTime and many more. A critical metric for our customers is the number of UniqueViewers (Daily/Monthly Unique), which happens to be one of the few metrics that cannot be directly added up. For example, we cannot simply add the number of unique viewers for each day of the week to get the number of unique viewers for the entire week.

Blog 1

The truly accurate way is to maintain the sets of unique ids for each day and then find the cardinality of the union of the sets. This is not feasible computationally or storage-wise, since we typically see millions of unique viewers each day. Instead, we decided to use the relatively recent HyperLogLog algorithm, which provides an estimation of the cardinality of a set and, more importantly, is ‘aggregatable’.

What is HyperLogLog

HyperLogLog provides a very accurate estimate of the cardinality using as little space as possible by using the simple yet very powerful idea of uniform distributions. Essentially, given a uniform distribution of N 0s and 1s, we expect that:

  • half the numbers will start with 1
  • a fourth will begin with 01
  • an eighth will start with 001

…and so on. In other words, the leading run of zeros gives us an idea of how many unique numbers we should sample from the uniform distribution to get the pattern we want. If we want a number starting with 1 (i.e. no leading zeros), then we should sample at least twice. Or, if we want a number starting with 001 (two leading zeros), we should sample 8 times or more. Flipping this logic around, if we have taken a bunch of unique samples, and the longest leading run of zeros is 2, then we must have sampled about 8 times. HyperLogLog builds on this logic, using a hash function to map the input to a uniformly random bit-stream, creating a “sketch” using multiple registers and employing harmonic mean to arrive at a much better estimate. For more details, please read the paper referenced above and fiddle around with this excellent interactive tool from Neustar Research (formerly Aggregate Knowledge) to watch HyperLogLog in action.

blog 2

Due to the way HyperLogLog is implemented by using multiple registers, we can easily estimate the cardinality of the union of sets by adding the sketches together, provided they all use the same number of registers. To add two sketches, we simply look at each register position in the two sketches and take the maximum of two values.

blog 3

The cardinality of the intersection of sets can also be estimated by using the formula:

|A n B| = |A| + |B| - |A u B|

The HLL algorithm provides the error bound as 1.04/√m , where m is the number of registers to be used. Typically, m is chosen to be a power of 2. So, we get error = 1.04 / 2^(b/2). The following table gives the error rate and storage requirements for different choices of b (each register requires a byte of storage). Standard implementations of HyperLogLog have some optimizations for small cardinalities, so the size on disk may be a little less. We use Twitter’s implementation from their algebird package.

B
ERROR
SIZE
12 1.6250 % 4 KiB
13 1.1490 % 8 KiB
14 0.8125 % 16 KiB
15 0.5745 % 32 KiB
16 0.4063 % 64 KiB

Conviva has built an industry reputation on metric and data accuracy. So while we wanted to make use of this efficient algorithm, we still decided to pay what others might consider a heavier price for relative accuracy. We decided to use b = 16, trading off less efficient storage for a tighter bound on relative error. Others may choose to go for a much more memory efficient distinct-value computation solution by selecting a smaller number of registers.

We traditionally used large replicated MySQL instances for storing all the metrics, but HyperLogLog presented us with multiple challenges:

  • It is not easy to use java code in stored procedures for adding and approximating unique viewers from binary HyperLogLog data
  • Even with just 8 byte longs for metrics, our tables were already hitting 100 GiB mark, so adding a column with kilobytes of data to each row quickly became impossible
  • Adding such large binary arrays in a single machine is a poor use of resources and a bottleneck

It quickly became very apparent that we had to go to a distributed storage system like Apache HBase. But, we also wanted an SQL like interface so that we could easily aggregate the metrics on the server, instead of writing our own code to scan the table, fetch the data, group them and then add them up. And right on cue, along came Apache Phoenix.

Aggregation and scaling using Phoenix over HBase

Phoenix, originating at Salesforce, is a java library that sits on all the HBase masters and region-servers. It uses HBase coprocessors to provide an SQL interface on top of HBase. It also contains a client side library for performing final aggregations of data returned by the coprocessors. Phoenix also uses optimizations for pre-splitting tables based on hash of the key, and for performing intelligent scans to ensure fast response times. It also provides a good framework for adding custom functions for aggregating data, which we used to add functions to aggregate and estimate HyperLogLog from binary data in the tables.

Aggregate HyperLogLog
/**
 * Custom aggregator to merge HyperLogLog data (VARBINARY -> VARBINARY)
 */
public class ApproxUniqAggregator extends BaseAggregator {
    ...
    @Override
    public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
        byte[] buffer = new byte[ptr.getLength()];
        System.arraycopy(ptr.get(), ptr.getOffset(), buffer, 0, ptr.getLength());
        HLL thisHll = HyperLogLog.fromBytes(buffer);
        if (aggHLL == null) {
            aggHLL = thisHll;
        } else {
            int aggBits = aggHLL.bits();
            int thisBits = thisHll.bits();
            if (thisBits == aggBits) {
                aggHLL = thisHll.$plus(aggHLL);
            } else if (thisBits < aggBits) {
                aggHLL = thisHll;
            }
        }
    }
    @Override
    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
        if (aggHLL == null) {
            ptr.set(PDataType.NULL_BYTES);
        } else {
            ptr.set(HyperLogLog.toBytes(aggHLL));
        }
        return true;
    }
    ....
}
Estimate Cardinality from HyperLogLog data
/**
 * Estimate the cardinality from HyperLogLog data
 */
public class HllConvertFunction extends ScalarFunction {
    ....
    @Override
    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
        if (!getChildExpression().evaluate(tuple, ptr)) {
            return false;
        }
        byte[] source = new byte[ptr.getLength()];
        System.arraycopy(ptr.get(), ptr.getOffset(), source, 0, ptr.getLength());
        ptr.set(PDataType.LONG.toBytes(HyperLogLog.fromBytes(source).approximateSize().estimate()));
        return true;
    }
    ....
}

Now that we had the framework ready for saving, aggregating and querying the HyperLogLog data for estimating the unique viewers for our customers, all that remained was a component that used the Phoenix client library efficiently and could handle the load of multiple simultaneous queries. Although we had an existing Java component for querying MySQL database, we decided to write a new component, partly to leverage the capabilities of Scala, but mostly to learn from the design mistakes of the current component to create a more scalable and maintainable one.

Putting it all together (or, nothing works out of the box)

With all of the code and infrastructure in place, it was time to put everything to test. An early setback that we faced was due to HBASE-10312, which crippled our HBase data loading components, sending them into stack overflow frenzies. With no immediate patch in sight, we decided to alter our data loading strategy by adding more batching and reducing the parallelism. This actually ended up improving the through-put due to reduced overhead, and better network utilization with larger batch sizes.

We also used the short circuit local reads optimization to improve the read throughput by letting the hbase regionservers directly access data on disk, instead of going through the local datanode. Increasing the number of handler counts on both the datanodes and regionservers allowed us to handle more queries in parallel.

Salting is another important optimization in Phoenix. It splits the table into preset regions, and distributes the regions across the HBase regionservers. This allows queries to run in parallel on the entire cluster, significantly reducing the response time of the queries. As we had experienced earlier, over-parallelism can be very detrimental. So we analyzed our tables and set up appropriate number of salt buckets for the tables, depending on the amount of data, types of queries run and the frequency of their use. With HBase, choosing the right primary key is also very important because, unlike MySQL, it is not so simple to create secondary indices, unless we are prepared to pay for the storage and query overhead associated with it. So we looked at our query patterns, particularly the WHERE clauses and GROUP BY columns to come up with a key to satisfy our use cases with minimal overhead.

An unexpected problem we faced when querying for HyperLogLog data over long ranges of time was the amount of data being transferred from the regionservers to our Scala component using the Phoenix client, especially for queries that involved group-bys. We reduced the data being fetched by using Sub-Queries to filter out the rows we did not need. We also reduced the number of threads used to process each region in HBase. The default (100) was too high and left too much for the client to aggregate, so we dialed it down to 10.

Conclusion

We started with the desire to provide our customers with aggregatable unique viewers over arbitrary time ranges, and ended up with a whole new distributed data access and storage stack with low latency query response times. We used HyperLogLog to estimate the unique viewers, HBase with Phoenix to solve the storage and query-time processing requirements of HyperLogLog, and wrote a new component in Scala using Futures to provide a scalable solution for querying the data.

Everyday multiple Engineering teams in Conviva are trying to use cutting edge technology to come up with innovative solutions to tough problems. We are still small but the impact potential is large. If you are interested in such work, check out our current openings at the Conviva Careers page. We would love to hear from you!