TL;DR InfluxDB Tech Tips — From Subqueries to Flux!

Navigate to:

In this post we translate subqueries, using InfluxQL in InfluxDB version 1.x, into Flux, a data scripting and functional query language in InfluxDB version 1.8 and greater in either OSS or Cloud. The subqueries translated here come from this blog. This blog assumes that you have a basic understanding of Flux. If you’re entirely unfamiliar with Flux, I recommend that you check out the following documentation and blogs:

How do I translate the following InfluxQL subqueries to Flux?

Subquery 1: Perform functions on functions on functions

The following subquery calculates the average value of passengers for every subway tag and returns only the maximum average value:

> SELECT MAX("mean") FROM (SELECT MEAN("passengers") FROM "schedule" GROUP BY "subway")

Flux Translation 1: Perform functions on functions on functions

Select the bucket you want to query your data from and the range you want to query over. Filter for the field of interest. Calculate the mean over a specific column. Flux groups data into tables according to group keys. You don’t need to group by the subway tag key because it is a group key by default. Now group all of your mean values together into one table to find the overall max of the mean values.

from(bucket: "schedule")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_field"] == "passengers") 
  |> mean(column: "_value")
  |> group()
  |> max()

With InfluxDB 1.7 and earlier versions an InfluxQL query returns just a single collection of unorganized data. With InfluxDB 1.8 and later data is organized as an Annotated CSV. This data format contains metadata that helps you perform more sophisticated queries.  Flux allows you to return the data in organized groups or tables. Tables have group keys,  a list of columns for which every row in the table has the same value. A group key value, true or false, is assigned to each column in the table. For example, a column name might be “subway” and represent our subway tag key. If every row in the table for our “subway” column is the same, then the group key value is set to true. This indicates one of two possibilities: 1) that either you have only one tag value and all of the rows of the column are the same or 2) that you have used a group() function. If the latter is true, then your data has been grouped into smaller tables where, for example, in one table all the rows of the “subway” column have the value “station A” and in the other table all the rows have the value “station B”.

Subquery 2: Perform additional analysis on the results of another query

Another use for subqueries is to execute a function on the results of a mathematical operation. The following query calculates the number of spilled_coffees per passenger and returns the average of those quotients:

> SELECT MEAN("spills_per_person") FROM (SELECT "spilled_coffee"/"passengers" AS "spills_per_person" FROM "schedule" GROUP BY "subway")

This use case is equivalent to performing mathematical operations within a function.

Flux Translation 2: Perform additional analysis on the results of another query

The Flux query is similar to the first Flux translation, except this time we filter for two field values, spilled_coffees and passenger. Next, we pivot our table so we can separate our field values according to the field keys. Now we use the map() function to perform math across the rows to calculate the number of spilled_coffees per passenger. Use the group() function to apply the mean() function across all of the results of the map() function.

from(bucket: "schedule")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_field"] == "spilled_coffee" or r["_field"] == "passengers")  
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with spills_per_person: r.spilled_coffee/ r.passengers }))
  |> group()
  |> mean(column: "spills_per_person")

Alternatively, use the fieldAsCol() function instead of a pivot like so:

import "influxdata/influxdb/v1"
  from(bucket: "schedule")
  |> range(start: -10d)
  |> filter(fn: (r) => r["_field"] == "spilled_coffee" or r["_field"] == "passengers")  
  |> v1.fieldsAsCols()
  |> map(fn: (r) => ({ r with spills_per_person: r.spilled_coffee/ r.passengers }))
  |> group()
  |> mean(column: "spills_per_person")

Note that this is a direct translation of the query found in this blog where we assume that all of the fields are part of the same measurement. In previous versions of InfluxDB (1.8 and earlier) you can’t perform math across measurements. With InfluxDB 1.8+ and Flux, you can perform joins and math across measurements. A similar query for fields in separate measurements would look like this:

httpd = from(bucket:"telegraf")
|> range(start: -1hr)
|> filter(fn:(r) => r._measurement == "influxdb_httpd" and r._field == "writeReq")

write = from(bucket:"telegraf")
|> range(start: -1hr)
|> filter(fn:(r) => r._measurement == "influxdb_write" and r._field == "pointReq")

avg_batch_size = join(tables:{httpd:httpd, write:write}, on:["_time"])

|> map(fn:(r) => ({_value: r._value_write / r._value_httpd}))
|> mean()
|> yield()

First we filter for data from two measurements, “influxdb_httpd” and “influxdb_write”. Then we join the two tables on the timestamps. Finally we use the map() function to perform the division.

Subquery 3: Place specific conditions on the results of another query

Execute a function and return only those results that meet a specific condition. This use case is similar to SQL’s HAVING clause. The following query calculates the minimum number of passengers at ten-minute intervals and returns only those minimum values that are greater than 15:

> SELECT "min_pass" FROM (SELECT MIN("passengers") AS "min_pass"  FROM "schedule" WHERE time >= '2017-01-25T18:00:00Z' AND time <= '2017-01-25T18:15:00Z' GROUP BY time(10m)) WHERE "min_pass" > 15

Flux Translation 3: Place specific conditions on the results of another query

This Flux query takes advantage of the aggregateWindow() function to easily calculate the min of the data for every 10 min. Apply a filter to find the minimum number of passengers above 15  passengers.

from(bucket: "schedule")
  |> range(start: 2017-01-25T18:00:00Z, stop: 2017-01-25T18:15:00Z)
  |> filter(fn: (r) => r["_field"] == "passengers")
  |> aggregateWindow(
       every: 10m,
       fn: min)
  |> filter(fn: (r) => r["_value"] > 15)

What else is there?

I hope this tutorial helps you understand how to use Flux. As always, if you run into hurdles, please share them on our community site or Slack channel. We’d love to get your feedback and help you with any problems you run into.