TL;DR InfluxDB Tech Tips: Multiple Aggregations with yield() in Flux

Navigate to:

The yield() function determines which table inputs should be returned in a Flux script. The yield() function also assigns a name to the output of a Flux query. The name is stored in the default annotation.

For example, if we query the following table:

_measurement tag1 _field _value _time
Measurement1 tagvalue1 field1 1i 2021-09-17T21:22:52.00Z

Without the yield function:

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.00Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

The following Annotated CSV output is returned. Notice the default annotation is set to _results by default.

#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,_results,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1

Now if we add the yield() function:

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name: "myFluxQuery")

The following Annotated CSV output is returned. Notice the default annotation has been changed to myFluxQuery.

#group,false,false,true,true,false,false,true,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string
#default,myFluxQuery,,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement,tag1
,,0,2021-08-17T21:22:52.452072242Z,2021-08-17T21:23:52.452072242Z,2021-08-17T21:23:39.010094213Z,1,field1,Measurement1,tagvalue1

By changing the Annotated CSV, the yield() function allows you to return the results of the query at the point in time where the yield() function is called as a new table stream.

The yield() function is important because invoking multiple yield() functions allows you to return multiple table streams from a single Flux script simultaneously.

Returning multiple aggregations with multiple yield() functions

Imagine that you want to return the min(), max(), and mean() values of a single table:

_measurement _field _value _time
measurement1 field1 1.0 2021-09-17T21:22:52.00Z
measurement1 field1 2.0 2021-10-17T21:22:52.00Z
measurement1 field2 4.0 2021-11-17T21:22:52.00Z
measurement1 field3 5.0 2021-12-17T21:22:52.00Z

New Flux users, especially those from a SQL or InfluxQL background have the inclination to run the following Flux query:

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> max()
|> mean()

This is because they’re accustomed to being able to perform SELECT min("field1"), max("field1"), mean("field1"). However, the Flux query above would actually just return the min value. Flux is pipe-forwarded, so you must use multiple yield() functions to return the min, max, and mean together:

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()
|> yield(name: "min") 

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> max()
|> yield(name: "max") 

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> mean()
|> yield(name: "mean")

The above script would result in three tables:

Result: min

_measurement _field _value _time
measurement1 field1 1.0 2021-09-17T21:22:52.00Z

Result: max

_measurement _field _value _time
measurement1 field1 5.0 2021-12-17T21:22:52.00Z

Result: mean

_measurement _field _value
measurement1 field1 3.0

An aside: Remember that the mean() function doesn’t return a timestamp column because it’s an aggregator. There isn’t a timestamp associated with the mean value.

Using variables to perform multiple aggregations

While the Flux query above will yield all three transformations, it’s not an efficient query because you’re querying for the entire dataset multiple times. Instead, store the base query in a variable and reference it like so:

data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

data_min = data
|> min()
|> yield(name: "min") 

data_max = data
|> max()
|> yield(name: "max") 

data_mean = data
|> mean()
|> yield(name: "mean")

Important note: Make sure not to name your variables the same as function names to avoid naming conflicts.

With and without the the yield() function

It’s worth recognizing that you aren’t limited to using the yield() function once per query or query variable. You can use the yield() function multiple times within the same query. For example, if you just wanted to return both the min value from a query and the raw data, you could use the yield() function twice:

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> yield(name:"raw data")
|> min()
|> yield(name: "min")

Additionally, you don’t need to use the yield() function to return the results of a single query. The following query will yield a table stream because InfluxDB returns a table stream with the default “_results” assignment for the default annotation without a yield() function:

from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )
|> min()

However, if you don’t invoke multiple yields while making two queries, you will get an error. The following query:

data = from(bucket: "bucket1")
|> range(start: 2021-08-17T21:22:52.452072242Z)
|> filter(fn: (r) => r["_measurement"] == "Measurement1" and r["tag1"] == "tagvalue1" and r["_field"] == "field1" )

data_min = data
|> min()

data_max = data
|> max()

Produces the following error:

error in query specification while starting program: this Flux script returns no streaming data. Consider adding a "yield" or invoking streaming functions directly, without performing an assignment.

This error occurs because InfluxDB cannot return different table streams with the same default “_results” assignment for the default annotation.

Conclusion

I hope this post inspires you to take advantage of the yield() function. If you are writing Flux and need help, please ask for some in our community site or Slack channel. If you’re developing a cool IoT application on top of InfluxDB, we’d love to hear about it, so make sure to share your story! Additionally, please share your thoughts, concerns or questions in the comments section. We’d love to get your feedback and help you with any problems you run into!

Further reading

While this post aims to provide a comprehensive overview of how to use the yield() function, the following resources might also interest you:

  1. TL;DR InfluxDB Tech Tips – How to Interpret an Annotated CSV: This post describes how to interpret an Annotated CSV, the Flux query result format for InfluxDB.
  2. Top 5 Hurdles for Flux Beginners and Resources for Learning to Use Flux: This post describes common hurdles for Flux beginners and how to tackle them by using the InfluxDB UI, understanding Annotated CSV, and more.
  3. Top 5 Hurdles for Intermediate Flux Users and Resources for Optimizing Flux: This post describes common hurdles for intermediate and advanced Flux users while providing more detail on pushdown patterns, how the Flux engine works, and more.
  4. TL;DR InfluxDB Tech Tips – Optimizing Flux Performance in InfluxDB Cloud: This post describes how to optimize your Flux performance with the Flux Profiler and the Flux VS Code extension.agg