Flux Aggregation in InfluxDB: Now or Later

Navigate to:

Aggregations are a powerful tool when processing large amounts of time series data. In fact, most of the time you’re going to care more about the min, max, mean, count or last values of your dataset than you will about the raw values you’re collecting.

Knowing this, InfluxDB and the Flux language make it as easy as possible to run these aggregations, whenever and wherever you need to, and sometimes that leads people to running them in ways that aren’t as efficient as they could be. Here are some ways to ensure your aggregation query runs as fast as possible.

Don't aggregate too early

As great as aggregate functions are and as much as you want to use them, be careful not to use them too soon. Oftentimes, we’ll see somebody with a query like this:

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> yield(name: "mean")

While this query will return the exact results you want, in this case the average total CPU for a specific host, it’s also doing a lot more work than it needs to.

Because the aggregateWindow function is being called before the filter calls on host and cpu, InfluxDB ends up calculating the average for all host and cpu values first, and then dropping the raw data after it’s done all that hard work.

Instead, perform all the filtering you can before using an aggregate function. This reduces the total amount of data going into those calculations, which will give your query a big speed boost especially on large data sets.

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  |> yield(name: "mean")

You can still filter data after calling an aggregate, which is useful when you want to filter on the results of the aggregation. But for filters that only check the raw data in your bucket, it’s always best to apply them first. In fact, that’s what the InfluxDB UI’s query builder does automatically!

Don't aggregate too late either

While you don’t want to get ahead of your data when it comes to aggregation, it’s also possible to call them too late in your query, resulting in a slower response time. For example:

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> last(column: "_time")

Again this will do what you want: give you the last value of every field in the matching series presented as a single row with fields as columns. However this query will have InfluxDB doing the pivot on the entire data set only to once again throw away that hard work immediately afterwards.

Here it is better to call the last aggregate function before pivoting the data, to reduce the amount of data that has to be transformed to just the data you want in the end.

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> last(column: "_time")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Pushdown opportunities

When your Flux query is executed, the first thing it does is read your measurement data from the storage layer of InfluxDB before performing your calculations and transformations on that data in memory. Some of those steps can be performed by the storage layer instead, meaning less data being read into memory to begin with.

When this happens we call it “pushing down” those steps to the layer below Flux, the storage engine layer, and query patterns that can do this are called pushdown patterns. Not only do these result in less data being read into memory, they reduce the amount of work the Flux engine has to do.

The first and most basic pushdown pattern is the common from() |> range() |> filter() that most Flux queries use. If you can put your aggregate function immediately after one of these, some of them can also be pushed down to the storage layer. Let’s take a look at our last query:

from(bucket: "myBucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["host"] == "myHost")
  |> filter(fn: (r) => r["cpu"] == "cpu-total")
  |> last(column: "_time")
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Because we put the last function immediately after a from() |> range() |> filter() pattern, it will also be pushed down to the storage layer. This means the only data that will actually be read into the Flux runtime is the last values of our cpu measurement for the selected series, and the only step executed in memory is the pivot step at the end. If we had left it calling last after the pivot, not only would all of that extra data have to be read into memory, but both the pivot and last functions would have been executed in the Flux layer.

Putting it all together

So to recap, aggregation is great and Flux is really good at it, and you can make it work better for you by doing as much filtering as possible before applying the aggregation, waiting on performing any transformations of the data until after the aggregation, and keeping an eye out for opportunities to use pushdown patterns that let the storage layer do more of the heavy lifting.

Further reading