9.2.4 Preaggregation
-------------------------------------------------------------------------------- EVENTQL - PRE-AGGREGATION -------------------------------------------------------------------------------- v0.8 - September, 2016 Paul AsmuthTable of Contents 1. Introduction 2. Definitions 3. Design Overview 4. Implementation Details 4.1 Table Eligibility 4.2 Configuration 4.3 Aggregation Functions 4.4 Pre-Aggregation Procedure 4.5 Leader Election 4.6 Load Balancing 4.7 Changes to the ALTER statement 4.8 Changes to the INSERT statement 4.9 Changes to the SELECT statement 5. Reliability Considerations 6. Alternatives Considered 6.1 Implement as standalone service 6.2 Implement in the compaction routine 7. Code Locations 1. Introduction One of the primary usecases of EventQL is multidimensional analysis of timeseries data. These type of analyses usually involve a table that is indexed by a DATETIME primary key and contains a number of other columns that can be classified into "dimensions" and "measurements". Dimensions are columns that may be referred in the GROUP BY statement of queries on the data. Measurements are (usually numeric) columns that will only be used in aggregations (i.e. from aggregate function in the SELECT list). For this usecase it's usually highly beneficial to pre-aggregate the measurements for each input dimension over a recurring time interval before writing them to disk. To see why this may result in huge performance increases, consider this somewhat realistic usecase: We're running an ad network and are displaying 100,000 ad impressions per second. We want to measure the rate at which these ads are clicked with 1-minute granularity. To make things a bit more interesting, we also want to be able to get an individual click rate for each of the websites on which our ads are displayed. If our ads are displayed on 4,000 individual websites at any given time, the examples works out so that the size of the data after pre-aggreagtion would be roughly 0.07% of the size of the input data. A 1500x speedup. 2. Definitions We define an abstract pre-aggregation routine, that accepts a rowset N of rows for a given input time interval and produces another rowset M, so that any query that computes aggregations on the measurements (and optionally groups by time or one of the dimensions) will return the same result for both input rowsets N and M. The number of rows in M will always be less or equal to the number of rows in N. The length of the recurring aggregation time interval represents a lower bound on the time granularity over which the data can be grouped in queries. Hence we'll refer to the length of an individual aggregation time interval as the "granularity" of the aggregation from now on. We also define the "cardinality of an aggregation" for a given time interval as the number of rows in the output rowset M. Given an arrival rate of events R and a granularity G, the pre-aggregation function transforms R * G input rows into C output rows where C is the cardinality of the aggregation. 3. Design Overview The proposed approach is to implement pre-aggregation as a separate internal subsystem, touching as few of the core subsystems as possible. Pre-aggregation is optionally enabled for a table by setting a pre-aggregation config for the table (further specified in section 4). Once enabled, we elect N hosts as "aggregation nodes" for the table. Each insert into the pre-aggregation-enabled table will initially bypass the normal insert code and will instead be sent to one of the N aggregation hosts. Each aggregation node then performs the pre-aggreagtion routine locally on all received input rows and, at the end of the aggregation interval, forwards the output rows to the regular table insert procedure. Of course, this scheme will result in up to N * C instead of C output rows. While this would not impact the correctness of any aggregate queries over the data, it would still be visible to the user when doing a simple select on the table. For UX reasons, and because we consider it an implementation detail that should not have to be explained in user-facing documentation, we automatically insert a default aggregation clause into every SELECT select statement which refers to pre-aggregation-enabled tables. This default aggregation clause is statically derived from the pre-aggregation config. 4. Implementation Details 4.1 Table Eligibility Pre-aggregation may only be enabled for tables that meet the following requirements: - Must have a non-unique primary key of time DATETIME 4.2 Configuration We add the following fields ("the pre-aggregation config") to the table configuration (which in turn is stored in the coordination service). - granularity -- The aggregation granularity - num_servers -- The number of servers that should run the aggr. - dimensions -- A list of dimensions, for each dimension: - dimensions.column -- The column name for the dimension - measures -- A list of measures, for each measure: - measure.column -- The column name for the measure - measure.aggr_fn -- The aggregation function to be used The pre-aggregation config may be changed by the user at any time. On any change to either the pre-aggregation config or the table schema the server must check and update the pre-aggregation config in such a fashion that it never refers to non-existing columns. 4.3 Aggregation Functions An aggregation function is responsible for converting a number of scalar input values for a given measure, dimension and time interval into a single scalar aggregated value that will be stored into one column of the output row. Aggregation functions may optionally take a number of configuration parameters. The parameters are encoded as = pairs where key and value are both strings. Additionally, not every aggregation function is required to support every (output) column type. The following is not meant as a definite list of supported aggregation functions but mereley as the list of initially considered and supported methods: - COUNT - SUM - MEAN - MIN - MAX - DELTA - CARDINALITY_APPROX - QUANTILES - QUANTILES_APPROX 4.4 Pre-Aggregation Procedure The pre-aggregation procedure is run for a specific table and on a specific host. All the work it performs is done on the local machine and all data is kept in memory. Conceptually, the procedure stores a map of running aggregations. Each entry in the map contains a vector of aggregation states for each measure. The key for the map is another vector of strings. The key vector has one element for each dimension in the aggregation config plus one element for the time. The elements for the dimensions are set to the values of the respective columns in the input row (or NULL if the input row is missing the column). The time element is set to quotient of the input row's primary key column value and the granularity (using truncating division). Input rows without a primary key column value are rejected. For each incoming row, the matching entry is created or retrieved from the map of running aggregations. Then, the aggregation function for each measure is called with the corresponding column's value from the input row and the last aggregation state from the map entry. Finally, the result of the invocation is written back to the aggregation state in the map entry. Once a new entry is added to the aggregation map a timer is started that fires after G (granularity) seconds have passed one the wall clock. Once the timer fires, the entry is removed from the map and inserted into the table using the regular table insert mechansim (using the nominal time retrieved from the map's key as the rows primary key value). 4.5 Leader Election We use the coordination service to elect the N aggregating servers from the list of available servers for every table. The aggregating servers may, in principle, be randomly chosen from all live servers in the cluster. The implementation of the election routine is pluggable (as is the coordination service) and not in the scope of this document. 4.6 Load Balancing Each input row should be sent to one of the aggregating servers for the table in such a fashion that the distribution of rows over servers will be approximately uniformly. This could be implemented as a random pick for every row or a simple round-robin scheme. However, we're explicitly _not_ using a consistent assignment scheme based on the rows dimension as that would open the system up to hotspotting issues if the distribution of the input data is not uniform (which is likely). 4.7 Changes to the ALTER statement The ALTER TABLE must be changed to atomically remove columns from the pre-aggregation config when columns are removed from the table schema. Forthermore, the new ALTER TABLE SET PREAGGREGATION statement must be implemented. 4.8 Changes to the INSERT statement The INSERT statment must be modified to check if the table is pre-aggregation-enabled and if so, divert the insert into the pre-aggregation subsystem. A new flag INSERT_NOPREAGGR is added that optionally disabled this behaviour. 4.9 Changes to the SELECT statement The SELECT statement execution must be altered to insert an aggregation node for every query on a pre-aggregation-enabled table that does not already contain an eligible aggregation clause. The synthetic aggregation node is a group over the quotient of primary key column value and granularity (using truncating division) where all columns referred to from measurement definitions are aggregated using the respective aggregation function's output merge function. 5. Relability Considerations The proposed implementation has no provisions to handle pending data on server shutdown or server crashes, so in these events some input rows will be lost. As part of the separate writea-ahead-logging effort, we will add (optional) durable write-ahead storage for accepted input rows. A separate issue that is not addressed is that the API and implementation do not allow for exactly-once-inserts in the case of failures: If the client recevies an error code after sending an insert to one of the aggregating servers, it can't be sure if the value was stored or not so it can't correctly decide if it should retry or not. However, this is really more of a philosophical concern (solving the problem would defeat a large part of the benefits of pre-aggregation). Users that need strict exactly-once semantics can not use this optimization and should insert the rows individually, without pre-aggregation (which _is_ an idempotent operation and safe to retry). 6. Alternatives Considered 6.1 Implement as standalone service On first look, pre-aggregation might seem like a feature that does not belong in the core database, but could be provided as a separate service that performs the pre-aggregation and then simply stores the aggregated rows into the database. However, with this scheme it would be painful to support more complex aggregation modes, like estimating the number of unique strings or estimating quantiles. Both of these problems would be easily solvable using probabilistic data structures, but if the pre-aggregation was performed in a standalone service we would have to precompute and serialize these data structures before sending them to the database. And we can't treat the data as binary blobs in the database if we want to run useful (and fast) queries on it, so this would leave us with one of two choices: Either duplicate all data strcture code from the database into the standalone service or a fat client and then make sure that both copies always match 100% so the serialization works (which would be a PITA). Or alternatively, keep the code only in the database and send all the raw values to the database as part of the aggregated row (which would defeat the purpose of streaming aggregation). 6.2 Implement in the compaction routine One downside of the proposed approach is that with constant arrival rate R and granularity G, the number of stored rows for a given input stream still scales linearly in the number of hosts we assign to the stream (and therefore, strictly speaking also in the arrival rate R). One alternative solution, implementing the pre-aggregation in the partition compaction routine, avoids this and ends up with a constant number of stored rows regardless of the number of servers assigned to the aggregation. However, in the interest of keeping the design simple and minimally invasive to the other subsystems, modifying the compaction routine is left open for when/if the issue turns out to be a practical limitation. 7. Code Locations FIXME