Coming soon! Our webinar just ended. Check back soon to watch the video.
Introduction to Flux and Functional Data Scripting
Webinar Date: 2021-04-29 08:00:00 (Pacific Time)
Flux is a functional data scripting language designed to accommodate a wide array of data processing and analytical operations. We will explain the origins of Flux, walk through key Flux concepts and outline basic Flux syntax.
- Understand the history of and motivations behind Flux
- Become familiar with foundational Flux concepts
- Become familiar with basic Flux syntax
Watch the Webinar
Watch the webinar “Introduction to Flux and Functional Data Scripting” by filling out the form and clicking on the Watch Webinar button on the right. This will open the recording.
Here is an unedited transcript of the webinar “Introduction to Flux and Functional Data Scripting”. This is provided for those who prefer to read than watch the webinar. Please note that the transcript is raw. We apologize for any transcribing errors.
- Caitlin Croft: Customer Marketing Manager, InfluxData
- Scott Anderson: Senior Technical Writer, Tech Lead, InfluxData
Caitlin Croft: 00:00:00.159 Welcome to today’s webinar. My name is Caitlin Croft. I’m really excited to see you all here today to learn more about Flux. I’m joined today by Scott, who’s part of our technical writing team. And he really knows Flux, so I’m super excited to see him here. Once again, the session is being recorded and will be made available later today. Without further ado, I’m going to hand things off to Scott.
Scott Anderson: 00:00:29.596 Hey, Caitlin. Thank you again, everyone, for attending today. As Caitlin introduced me, I’m Scott Anderson. I am part of our Docs team here at InfluxData. My official title is, I’m a senior technical writer and the tech lead of the Docs team. Today, we’re going to do a really basic introduction to Flux. What it is, syntaxes, and some foundational principles. That, hopefully, as you begin to learn Flux and begin to use flux, and even if you already know Flux, we might be able to inform and help you grow your Flux knowledge and Flux skills. So if you’re unfamiliar with Flux — sorry. One second. There we go.
Scott Anderson: 00:01:16.161 So what is Flux? Flux is a functional data scripting language designed to query, process, and act on data. And I emphasize data scripting because most people refer to it as a query language. When, yes, it is a query language, but it can do so much more than that. I mean, the query part of what Flux does is it pulls data from a data source. But beyond that, you can use Flux to process that data much like you would do in Python pandas or R or anything like that. And the functionality available for data processing is only growing within the Flux language. It also allows you to act on data. So you can monitor metrics that you’re tracking. And if they cross a threshold, you can trigger an alert to Slack or to PagerDuty or whatever. All that can be done within a single syntax, rather than doing all of those different tasks across multiple different syntaxes.
Scott Anderson: 00:02:15.117 When Flux was first under development, as they set out to design this new language, they had design goals. These are the overarching design principles and goals behind Flux. It needs to be a Turing-complete functional language. It needs to be usable. It needs to usable. It needs to be readable, flexible, extensible, testable, which there’s been a lot of progress on recently, contributable, and shareable. So anything that our Flux team is doing to extend Flux in some way relates to one of these design goals. These are — I’m lost for words, but — the guiding principles. That’s what I meant. That’s the word I’m looking for. The guiding principles behind the development of Flux.
Scott Anderson: 00:03:05.895 So I think to really learn how to use Flux, you have to understand how it works, obviously. And for many of us, this can be a bit of a barrier to entry because it’s not your typical functional language. It’s very focused on data. So for me, as I was first learning the language, I kind of had to find a metaphor to help me understand exactly what this language does. And the metaphor that it landed on was that of water processing. So whenever you’re processing water, you have a reservoir of water that you’re pulling water from. That reservoir isn’t necessarily clean. There’s a ton of it. You don’t need all of it, so you begin to take a little bit out at a time. So you have a pump house that pulls out a limited amount of water and then it pipes that water into a settling tank to remove sediment.
Scott Anderson: 00:04:05.907 You can then process it with additives that might kill any bacteria or any other harmful things in the water. You can then separate it out and process it in different ways for different purposes. But the ultimate goal of removing or pulling that data or pulling that water out of that source is to turn it into something digestible and usable in small amounts. Something that you can return back and it’s just easy to use. So if we look at Flux and the query functionality of Flux and the processing functionality of Flux, this metaphor is what helped me to really understand it. Is that you’re pulling data from a large reservoir of data. From a data source with gigabytes, in some cases, terabytes of data. But you only need to pull back a little bit of the time and you shape and process that data specific to information that you are trying to get and then return that information after it’s all been processed so that it’s easily digestible and easy to work with.
Scott Anderson: 00:05:16.135 So this is a basic Flux query. The front function kind of acts as that pumphouse. It’s pulling data from a source. In this case, the front function is from InfluxDB. That data is then piped forward into other types of functions that do different things. And we’ll get more into these later. But just a quick summary. Range filters data by time, filter filters data by column values within each row, and an aggregate window applies in aggregate to windows of time. But this data, again, is being pulled from a source, piped through this pipeline, and processed as it goes along.
Scott Anderson: 00:06:01.026 So one thing that, again, is another barrier to entry into understanding how to use Flux is just the data structure. And this I’ve seen can be confusing to people. So hopefully, I can help clarify it today. So if we’re looking at that actual pipe of data that’s coming through, that water that’s going through the pipeline and being purified and being processed, inside of that pipe, we have a table or multiple tables. And what this is referred to in Flux is a stream of tables. So a stream of tables can, again, have a single table or multiple tables, and in some cases, no tables. But it’s a logical container for data that gets returned from a function or from a source.
Scott Anderson: 00:06:55.396 So if we look at one of these tables, what a table is, and this is — usually when you think about data, you think about rows, and each row has a column. In Flux, it may be better to think about columns where each column is divided into rows. So in this case, we have a version, a user ID, a session, ID, a field, and a value column. And for each of these columns, we have multiple rows. And in each row, all of those values are associated by their context within a row. But if we focus on the columns, Flux really is built around columns and manipulating columns. So with a stream of tables, to have multiple tables, each tables actually — it’s controlled by a group key. This group key gets applied to the whole stream of tables. But essentially what a group key is, is it’s a list of columns in which all the values for that column are the same in each table.
Scott Anderson: 00:08:03.955 So in this case, my group key is empty. It’s not specifying any columns that I want to group by. So I get a single table in my stream of tables. It just groups everything into one table because there’s nothing in the group key. But let’s say I want to group by version and user ID. You can use the group function in Flux to modify the group key and actually add columns or specify the columns that should be in the group key. So in this case, I have a version and user ID. It breaks it into two tables. Because although my user ID is the same throughout the data, my version is different. And it takes all of the unique version values and groups them into individual tables. If I were to have another unique user ID in here, it would create new tables specifically for those user ID and version combinations. So that’s how you have multiple tables inside of a stream of tables and it’s all controlled by the group key.
Scott Anderson: 00:09:06.851 Now, something that’s important to understand is that Flux, all it really knows about is columns and rows. The actual structure and schema of the data that gets returned by a data source completely depends on the data source. For example, if I were to query a MySQL Database with Flux, it would return a single table without any columns in the group key. So it’s a pretty small table. It’s easy to work with. But any SQL query that you run, the way that the SQL integration works is the stream of tables will always contain a single table with an empty group key. But if I query InfluxDB, InfluxDB, by default, returns data grouped by series. So a series is an InfluxDB concept.
Scott Anderson: 00:10:08.640 Essentially, the definition of a series is data points that share a common measurement, a common tag set, and a common field key. But all of those concepts live within InfluxDB itself, not inside Flux. InfluxDB is just returning the data grouped in a certain way. So in this case, the group key is going to be the start column, the stop column, measurement, host, and field. Now, start and stop, those columns are added by the range function. But the measurement, host, and field, that is the series key, right? The common measurement, the common tag set, and the common field key. So by default, InfluxDB returns data grouped by series. And you can modify the grouping, you can change how that data is shaped, and process that data anyway you have once Flux returns it from the source.
Scott Anderson: 00:11:07.527 So with that foundational understanding of just data structure and how that stream of table gets pulled from a source and you can have multiple tables in a stream of tables, let’s dive into just some basic Flux syntax. I’ve mentioned a little bit already. You’ve probably seen this in any Flux script you’ve looked at. This is the pipe-forward operator. This is the pipe of water that is going from function to function. This operator takes the output data of one function and pipes it into the next function as input data. It’s a foundational core operator inside of Flux. Flux supports the following basic types. We have strings, Booleans, floats, integers, unsigned integers, both of those are 64-bit, integers, and unsigned integers. We have duration values, time values, which are just RFC 3339 timestamps, and then we have byte arrays.
Scott Anderson: 00:13:31.741 Flux has really simple variable declarations. You just provide a variable name and a variable value with the equals operator. So you can use any of the basic data types to define a variable. You can also use composite data types to define a variable. And you can assign a variable to a stream of tables. So this from function is just an example of how you can assign a stream of tables to a variable. So when it comes to referencing keys inside of these composite types, I had to take a minute to just illustrate the variable declaration to provide some context for this example. Let’s say I define R as this record with key 1 and key 2. To reference each of those keys, I can either use dot notation. So r.key1, which will return value 1. I can also use bracket notation, where I have the square brackets with the key as a string inside of those. That too will return value 1. These two notations are functionally the same.
Scott Anderson: 00:14:42.869 The reason that bracket notation is important is if you have any white space or special characters in your key name, you need to use bracket notation to be able to reference those values. For arrays, you use bracket notation and you pass the index of the element you want to return, starting with zero. So the zero index of this array will return 12. So for dictionaries. Dictionaries –we don’t have a syntax specifically for referencing values in the dictionary, but we do have a function to retrieve a value in the dictionary, The dict package provides functions for accessing and modifying dictionaries. So dict.get, you pass in a dictionary to retrieve a value from. So in this case, it’s a dict example or dictEx. You then pass in the key you want to retrieve and then a default value if that key doesn’t exist. So in this case, this function returns foo because the 1 key does exist. But if I were to pass in 3, it would return baz. It would return that default value because the 3 key doesn’t exist inside of that dictionary.
Scott Anderson: 00:16:02.106 Now, the reason that dictionaries are really handy, let’s look at a record first. So again, we’ll take that same record we defined earlier. That same variable. We have R. And let’s say I want to dynamically access a key inside of that record. So I have another variable that says key and right now it’s defined as key 1. If I try to use dot notation to reference a key using that variable, it will return an error because that key doesn’t exist. Because records, when passing in the key to reference, it can’t process the actual key name. Those have to be static. So if I use bracket notation — and I just realized a typo here. There shouldn’t be a period after that in the bracket notation. But if I use bracket notation, again it will return an error. Even if I use string interpolation inside of that string, it will return an error because when referencing a record, those value have to be static. This goes into how Flux plans and executes the query.
Scott Anderson: 00:17:12.489 With dictionaries, if I define an R dictionary — again, the keys in the dictionary all have to be the same data type and the values all have to be the same data type. So in this case, I have key 1 and key 2, and then my values are all strings. Even though one of them should be an integer, I can always process that back into an integer after the fact. So I want to dynamically select a key in my dictionary using a variable. Dictionaries allow for this. They allow for dynamic key selection. So this function dict.get, I pass in my R dictionary. I pass in my key variable and provide a default if that key doesn’t exist. In this case, it will return value one. That is why dictionaries are so handy. That’s why they are introduced in the language, is to provide dynamic key selection inside of a composite type.
Scott Anderson: 00:18:08.085 So functions. We’ll spend a little bit of time here because obviously with Flux being a functional language, functions are highly important. With functions, this is a really basic function definition. You provide a function name, which this function is named F. You then specify parameters for that function, which in this case is X and Y. You then pass those parameters into an operation, and that operation returns a value based on the parameters that are passed in. So this function you have X divided by Y. So to use that function to call it, you have the function name and then you pass the parameters in. Parameters in Flux are names. Flux doesn’t currently support positional parameters. So you can’t just drop values in an order inside of this parameter list, and Flux will know what to do with it. They have to be named. That’s important to understand with the current state of Flux. So all parameters are named. So with this function definition, if I specify X is 12, Y is 2, this function will return the value 6.
Scott Anderson: 00:19:21.983 So there are actually a couple of different types of functions. And I’m not going to cover all of them today, but I’ll cover what I think are the most common and probably the most important to understand. So the first one is a static function. And this is a term that I use. I can’t think of a better way to describe this than static. Essentially, you provide static input and it provides static output or scalar output. And not necessarily the values are all scalar that you put in or that are returned, but the data, it’s using one of the basic data types or the composite data types as input and returning those as output. The second one is a transformation function. Now, what a transformation is, is a function that takes a stream of tables as input and returns a stream of tables. So in this case, this F function, it has a T parameter, and that T parameter equals the input stream of tables. So that left arrow operator or the less than hyphen combination, that represents input tables. So data can be using the pipe forward operator into this function. And this function specifically will return the tables unmodified, the stream of tables unmodified. But you can pipe T into other types of operations to modify that data before its output. The important thing here again is that a transformation takes a stream of tables as input and outputs a stream of tables.
Scott Anderson: 00:21:06.144 Another type of function that is used heavily in Flux is a predicate function. And what a predicate function does, it’s an anonymous function that has one or more parameters, and then it uses predicate expressions and comparison operators to result in true or false. So, for example, my parameter in this predicate function is R and my predicate expression is 1.0 equals 1.0. So the predicate expression using those comparison operators will always resolve or will always result to true or false. So in this case, 1.0 does equal 1.0. So this resolves to true. There’s other comparison operators you can use. So 1.0 does not equal 1.0. So this will resolve to false. These are all the available comparison operators in Flux. You have equal to, not equal to, less than, greater than, less than equal to, greater than or equal to, and then equal or not equal to a regular expression. Those are all the operators that are available.
Scott Anderson: 00:22:20.341 You then have logical operators that you can begin to chain predicate expressions with. So in this case, I have 1.0 equals 1.0, which is true. But also, with the and logical operator, I need foo does not equal bar to resolve to true. Both of these do. And because both expressions resolve to true, the overall expression resolves to true. Now, if I were to say 1.0 is greater than 1.0 and foo is not equal to bar, that first expression would resolve to false. The second would resolve to true. And since we’re using the and operator, both of these for this to resolve to true, need to resolve as true. Because one of them resolves to false — this resolves to false — the overall expression does.
Scott Anderson: 00:23:13.822 If I use the or logical operator, 1.0 is not greater than 1.0, but foo and bar are not equal. That expression will resolve to true because one of the expressions in the overall expression resolved to true. There’s also an order of operations inside of the process of resolving these predicate expressions. And you can use grouping with parentheses inside of a predicate expression. So the inner groups are always resolved first. So in this case, my foo equals bar or baz does not equal quz will resolve to true. And then it will then look at the 1.0 equals 1.0, which will resolve to true. And this overall predicate expression will resolve to true. The important takeaway here is that there is an order of operations, and it will affect how you write these predicate functions.
Scott Anderson: 00:24:18.878 So to give you a glimpse of how these are used, let’s define on our variable with a record just a really simple record for a person with the first name John, last name Doe, and he’s 42-years-old. So I want to evaluate this record using a predicate expression. And I am looking for a record with the first name John and an age that is greater than 40. In this case, based on this predicate expression, this will resolve as true. This is the type of record that I’m looking for. But if I were to change this record to Mike Smith age 29 when passed through this predicate expression, it would resolve the false. So again, we’re using dot reference here to reference the first name and the age provided in the R record. And yeah, this would resolve the false. So we’ll talk more about how those are used in Flux a little bit later.
Scott Anderson: 00:25:19.421 So with a basic query, in my experience, I guess, every query starts with a source. This is an example of an InfluxDB source. But you can have SQL, you can have CSV, you can pull data from an HTTP endpoint. There are multiple sources that you can pull data from. And that list is growing. But for today, I want to focus on InfluxDB. So when querying InfluxDB, after your source, you then filter that data. So again, range is a filter based on time. You’re defining a start and a stop, where the default value of the stop parameter for range is now. And if a parameter has a default, you don’t necessarily need to specify it in the function call.
Scott Anderson: 00:26:11.627 So this is filtering data to everything within the last hour. So that range filters based on time. Filters rows based on column values. So in this case, the filter parameter, fn or function, it takes each row in a table or a stream of tables, and each row is represented by that R value. And each row is passed into this function as a record. So if you think back to those tables that are returned by InfluxDB, what I’m saying is — in this predicate function in my filter function — is I’m looking for rows that have a measurement value of mem and a field value or a field key of used percent. Any rows that resolve to true when they’re passed through that predicate function are then output by filter. Any rows that resolve to false are dropped and filtered as not with those rows.
Scott Anderson: 00:27:19.024 After you filter data, the next step is generally to begin to shape that data. And there’s multiple ways to shape it. Probably the most common is grouping columns in a specific way or grouping data in a specific way. So just thinking back to the group key that we discussed earlier, you are defining the group key of your stream of tables. So this will group all of the rows and data points in my query data by host. So every single table will represent a unique host. Another common way to shape your data is to window data, and that’s grouping data by windows of time. You can use the window function to do that. And that will allow you to process data based on each individual window of time. That’s the next step, is to process data.
Scott Anderson: 00:28:13.402 In this case, it’s really simple. You’re computing a mean or an average of your points that are coming into Flux from InfluxDB. But this can be really anything. Any of the transformation functions that Flux provides and even custom functions that you can create to process data in the way that [inaudible]. But these are the basic parts of any Flux query. You have a source, you filter rows, you shape that data in some way, whether it’s grouping or windowing or other ways to shape the data, and then you process that data based on its shape.
Scott Anderson: 00:28:55.226 So I just want to take you through a quick example because there’s some important things to understand with how we process data. So here I’m defining a data variable as a stream of tables using that same query. So I’m querying from the mem measurement and the used percent field. That’s going to return data that would look something similar to this. By default, InfluxDB returns data grouped by series. So all of the data’s grouped by the same measurement, tag set, and field key. I have two separate hosts. So it’s returning the used mem percent for each of my hosts.
Scott Anderson: 00:29:37.819 So let’s say I want to just modify this data in some way. Map is an incredibly powerful function in Flux and one that you’ll I guarantee use as you start to build out your own custom processing functions or you’ll just use map by itself to process data. So what map does is it re-maps each record or row in a stream of tables. So just like the filter function, it takes R as — it’s a record that represents the row. And it remaps that record. So in this case, it’s taking that R record and remapping a new record with just time and just value. Re-mapping those values. And in this case, it’s using the existing value to create a new one. But what’s important here is that this is explicitly re=mapping a new record. So the new record that gets produced will only have an underscore time column and only have underscore value column. So the return table will actually look like this. Because we removed or we neglected to include any of the columns that were in the group key, all of the rows get grouped together. They have their timestamps and their values based on those values that we explicitly mapped in the map column.
Scott Anderson: 00:31:09.871 If you want to preserve existing columns with each row, you can use the with operator to preserve all those columns. What this does is it extends the row record. So if you define any column that already exists, it’ll override that value, or if you add a new column, it will add that column to your list. So in this case, I have r with the underscored value. So I’m remapping the existing value column, but I’m maintaining the rest of the columns. Because I’m using the with operator, it’s just extending that record. So I’m doing the R value times 0.01. That results in this stream of tables. The only column that modified was the value column and it just multiplied each of those percentages by 0.01.
Scott Anderson: 00:32:00.197 You can also add columns and you can layer in things like conditional logic here. So in this case, I’m remapping each of the rows or records inside of my stream of tables and I’m adding a state column. This column doesn’t exist in the data that’s queried, but I’m adding it, and I’m conditionally setting that value based on the existing value in the value column. So if R dot value is greater than 65, then set to high, else, okay. So when I run my data set through this map function, I get these tables back. Every row has been extended with that state column and the value based on the existing value in the value column.
Scott Anderson: 00:32:55.007 Now, when we start to do things like mean or other aggregate functions — that’s what these are, they’re aggregates — we need to understand how these transformations and these aggregates work. So there’s actually two types of transformations. Or I guess there’s three types. But the two important distinctions to understand is that there’s aggregates and there’s selectors. And the way that these behave is unique. So mean is an aggregate function. So when I look at my stream of tables, I have multiple tables with varying number of rows, and I pipe them through a mean function. The mean function operates on every table in the stream of tables individually and it aggregates only values within each of those tables. For every input table into an aggregate transformation you could get a single output table with a single row that, in this case, the value column will be the average of all the values in that table.
Scott Anderson: 00:34:05.020 So if this is our input, the resulting output will be still two tables with the average value. One thing to notice here is that the time column was dropped. With aggregate functions, any columns that are not in the group key, other than the value column, are dropped from the table because the aggregate functions don’t know how to aggregate those values. So you have multiple timestamps that are aggregated into a single value it doesn’t know which timestamp to use. In this case, you can just duplicate either your start or your stop time to be the new timestamp of that row. But it’s totally up to you. It’s just important to understand that, with aggregate functions for each input table, you get a single output table with a single row. If you want to take an entire stream of tables that is currently grouped into or partitioned into separate tables, what you have to do is you have to ungroup them. So you pass them through the group function with an empty array of columns so everything is in a single table and then you can average all of those values. Because again, for every input table in a stream of tables, you get an output table with a single row.
Scott Anderson: 00:35:24.790 So sum works the same way as mean. Just performs a different operation. It sums all the values in the table. And then we have max. So max isn’t an aggregate transformation, it’s a selector transformation where it evaluates each row in a table and it determines which has the highest value in the value column. So using that same data, a selector transformation will return one and, in some cases, more than one row, depending on the selector you use that meet the criteria for that specific selector function. So in this case, it’s choosing the maximum value from each column. And the thing to notice here is that selectors do not modify row data. They simply return the row as is that meets the criteria of the selector transformation. So another example is last just returns the last row inside of each input table.
Scott Anderson: 00:36:24.196 So let’s look at a few examples of Flux in the real world. So this is one, a Flux query, that is gathering market data or assumes market data is being gathered inside of a market summary bucket. We’re pulling all the data from the last six months using the range function. We’re then filtering by stock prices. And in this case, if you haven’t been watching the stock market, we’re looking at the GameStop ticker price. We then can pipe that data into a predictor function, which is another type of function available in Flux. Holt-Winters is an algorithm that allows you to predict future trends based on the seasonality of data that’s queried. So we can actually use the Holt-Winters function and apply that algorithm to predict future stock prices for GameStop. I don’t recommend this right now. I don’t know that the stock is in a state to provide accurate predictions. But if you’re looking to do any type of prediction with data that you have, this is a good example of that.
Scott Anderson: 00:37:41.851 This is an example of using temperature and humidity data to calculate a heat index. So there is a really long calculation for heat index. And it’s one that actually I did prototype, but it’s too big to fit on a slide. So you just get the summary dots here, the ellipses, that say there is an operation that will take temperature and humidity and output a heat index. So in this case, I’m assuming that inside of my sensor data bucket I have both a temperature field and a humidity field. So taking all the data from the last day, filtering by measurement sensors, and then I’m filtering it again by the temp field or the humidity field. So all the rows inside of my query data that have one of those field keys will be returned. This function uses pivot to pivot the data or pivot the fields into columns.
Scott Anderson: 00:38:49.859 So when you’re operating specifically with map, any values that you use inside of a map call need to exist in the row record. So you can’t take — unless you define them as a variable external to the map function. But if you want to operate on two fields, you have to query both of those fields and then pivot those fields into columns so that you have a field column inside of each record that gets processed. So in this case, what that pivot function is doing is, based on the timestamp, it will then pivot the temp field and the humidity field into columns for each row. So each row will have a temp column and a humidity column with the field value inside of that column. And then you use map to extend each of the rows with a new column for heat index. And I use my heat index function with the T parameter and the H parameter using the values from each row. So r.temp, r.hum, and it will output a new table with a heat index column.
Scott Anderson: 00:40:06.646 So this is where things start to get really fun. One thing that Flux allows you to do is not just join data, but join data from disparate data sources. So one thing I haven’t covered in this presentation is just how you import packages. Flux ships with a standard library, but it also ships with packages that are technically part of the standard library, but they aren’t loaded by default. So to load a package that isn’t already loaded, you just have an import statement at the beginning of your Flux script that will import that package. So in this case, I want to import the SQL package because I am going to query data from a SQL data source, which in this case is Postgres. So I’m defining a sensor info variable and I’m querying sensor data from Postgres.
Scott Anderson: 00:41:05.863 So the type of data or sensor data you would typically in Postgres, you’d almost use it as an asset management database. So you’re storing relational data for each of those sensors. The type of data that you wouldn’t necessarily store inside of InfluxDB just because of cardinality concerns and things like that. But useful information that you might want to associate with your time-series data stored in InfluxDB. So I have my sensor info variable and then I have a sensor metrics variable that queries data from InfluxDB, and it’s pulling all the data from my air sensors and measurement over the last hour. So then this join function takes my two streams of tables, sensor metrics, and sensor info, and it will append. So if you look in that join call you have the tables parameter with a record. The keys in that record will be appended to all the column names that are shared — excuse me. That aren’t shared between the two tables.
Scott Anderson: 00:42:13.299 So this will create an inner join, where based on the sensor ID, you will get new rows with all of the data that share a common sensor ID between the sensor metrics and the sensor info streams of tables. I then group that data by the column sensor ID and last inspected. So in this case, I’m seeing if there’s any trends between data or between sensors that may have been inspected too long ago. So the aggregate window function is a really powerful data shaping function and processing. What it does is it splits data or groups data into windows of time and then applies an aggregate to each window of time and then ungroups those into a single table again. So in this case, it’s going to give me a count of columns for every five-minute window. So that’s a really basic overview of Flux and the Flux syntax. Thank you again for the time that you’ve spent and it looks like we have a few minutes for some Q&A.
Caitlin Croft: 00:43:34.220 Fantastic. Thank you, Scott. For everyone who has joined, this has been a really great overview of Flux. If you’re interested in learning more, we do have a hands-on class at InfluxDays. So if you want to check that out and maybe register for it, I have thrown the link into the chat. All right. So the first question is, the tables created as part of group functions, are they virtual or physical tables?
Scott Anderson: 00:44:04.158 That is a great question. They are virtual. Flux doesn’t create any side effects until you tell it to create side effects. So you actually write that data. When you’re grouping that data, again, the data source is going to return it structured in a certain way. But that’s all in flight. It’s all in memory. So it’s all virtual. There’s nothing physical about it until you actually write that structured data back to a data source. So great question.
Caitlin Croft: 00:44:36.325 Can you show an example of how to run a Flux query in the CLI?
Scott Anderson: 00:44:41.138 Yeah. I could. [laughter] I’m a little unprepared. So there’s two commands that you can do this with. I mean — if you’re querying InfluxDB you can use the influx query command. Let me see if I can pull that up real quick. I wasn’t prepared to share my other screen. [laughter]
Caitlin Croft: 00:45:06.563 Thought I’d throw a curveball at you, Scott. [laughter]
Scott Anderson: 00:45:09.247 Oh, you’re throwing a curveball. One second. Is it okay if I ask you another question while you get that setup?
Caitlin Croft: 00:45:16.467 Sure.
Scott Anderson: 00:45:17.063 After group and aggregate, can I limit only top-end results?
Caitlin Croft: 00:45:24.654 After group and aggregate? Oh, yeah. Yeah. You can use the first function to do it. There’s also a limit function where you specify the [N?] number of records you want to return, the rows that you want to return, for each input table. If you want to limit based on the entire stream of tables, you just have to ungroup all of the data points so that the stream of tables is a single table and then run that same limit or last or first or however you want to process that data to get those specific values that you’re looking for.
Scott Anderson: 00:46:01.197 Great. Can we store the Flux output to a measurement? Is that possible in InfluxDB Enterprise?
Caitlin Croft: 00:46:09.993 InfluxDB Enterprise, I don’t believe — oh, that’s not true. So, yes. There’s a few things to understand. So with InfluxDB Enterprise — actually, no, you can’t because the to() function is built for — so the to() function is what writes data back to InfluxDB. Right now, the to() function is only built for InfluxDB 2.0. You can’t actually write it back to InfluxDB Enterprise directly in Flux. I don’t think that would be too hard to do. I think we would just need to add a to() function built against the 1.xAPI. Both of them just generate line protocol and [inaudible] protocol back to InfluxDB. It’s just the to() function, you have authentication credentials for 2.0, like the org, the token, and the hosts that are required to write that data. Where with 1.x, you don’t have that. So if that’s a Q&A question, I would encourage you to go create an issue on the Flux repo to add a to() function that can write to 1.x. That would be a handy thing.
Caitlin Croft: 00:47:29.342 How can I query a data points from sensor data with the following conditions? Values within timeframe from X to Y. If not available, nearest possible point. But if that one is older than 24 hours, return nothing.
Scott Anderson: 00:47:47.809 So that’s tricky. There’s a few things here that — one of the things that I wish we supported, and I’ve lobbied for it and it hasn’t gotten a ton of traction internally yet, but just the ability to — if a range or a value doesn’t exist inside the query range and you’re looking for the last reported value, it would be nice if we could retrieve that value somehow. Right now, InfluxDB doesn’t let you do that, and Flux doesn’t let you do that. You’re bound to whatever time range you specify in the query. So if points exist outside of that, Flux doesn’t know about it, there’s no API in InfluxDB to retrieve that last reported value. Hopefully, that will make it in the future. I know it’s been heavily requested from the community. So the question kind of hinges on that. So that answers part of that question. So values within a timeframe from X to Y, the range function will do that. Your X time stamp will be the start parameter, your Y timestamp will be the stop parameter, and it will only return values within that time range. So there is no way to find the nearest available point or limit that by a time range, so. Great question. So would be really helpful functionality if it were added to InfluxDB and we’re lobbying for it. It just hasn’t been made a priority yet, so.
Caitlin Croft: 00:49:26.255 Cool. Let’s see. I would like to know if I can calculate the 15-minute average every 15 minutes? Is it faster to run that every 15 minutes or once an hour using the aggregate window?
Scott Anderson: 00:49:44.178 That is a great question. So this is kind of an opaque answer. It completely depends on your data. [laughter] I think the lighter approach to each of these would be to run that query every 15 minutes. But that, too, has some negative consequences. You have to do that through an InfluxDB task. Not that the task engine is heavy, but just running that as a task and running tasks often can cause cueing in tasks, depending on if you’re in OSS or if you’re using [inaudible] or if you’re in cloud. And I believe cloud rate limits task executions, but don’t quote me on that. So it just depends on your data and how heavily you’re using tasks. But when it comes to an optimized query, the more optimized approach from a query perspective would be to run that same query every 15 minutes rather than an hour to aggregate that. Now, if you’re looking at cumulative performance, I don’t know that I can answer because I haven’t tested it. But I mean, how long does running the same query every 15 minutes, the cumulative time that takes, compare to the time it would take to run one query one hour and aggregate in that query each 15-minute window? I don’t know. I haven’t tested it. But if that’s a performance metric that you’re interested in, you certainly could test it.
Caitlin Croft: 00:51:26.792 Let’s see. What is the impact of grouping data on a data set where there are billions of records, mostly from a performance standpoint?
Scott Anderson: 00:51:36.193 With billions of records? So lots of performance questions and this is great. Our Flux team, right now, their main priority is optimizing Flux. The primary means of optimizing Flux is through the use of pushdown functions. What these do is they actually push the loader operation down to the underlying data source rather than pulling all of that data into memory and operating on the data there. So, for example, if you’re querying billions of rows, you don’t want to load those billions of rows into memory. So what Flux lets you do is push certain operations down to the underlying storage engine. With InfluxDB, grouping is one of those pushdown functions. The tricky part with pushdown functions is that, once data is pulled into memory, it can’t be pushed back down. So once you use a non-pushdown function, the rest of the data has to be processed in memory. So you want to front-load all of your pushdown supported functions at the beginning of your query. So for grouping specifically, you would query InfluxDB. You would filter based on the data that you want. But immediately after that filter, the first operation you would do is you would group that data because the grouping would happen on the underlying storage engine rather than pulling all those rows into memory. When it comes to specific performance metrics about grouping billions of rows on the storage engine, I don’t have those. I couldn’t tell you. But it will definitely be more performant than pulling all that data into memory. Not just more performant, it’ll be possible. Where I don’t think it would be possible to pull billions of rolls into memory unless you’re just running on a super-jacked machine. So, yeah, hopefully, that answers your question. Using pushdown functions to push operations down to the storage engine before data gets pulled into memory.
Caitlin Croft: 00:53:46.225 Great. So we’re running out of time, but we will take a couple more questions. And if you have any more questions for Scott, you can all email me, and I’m happy to get him to help you there. Do you have plans to include message cues as a source for Flux or do we need to depend on Telegraf for this?
Scott Anderson: 00:54:07.645 That is a great question. I only know a little bit on just the input methods or planned input methods for Flux. I know message cues have been discussed. So Flux is interesting right now because all Flux queries are batch queries. It doesn’t support live streaming data. Where message cues are typically that. They’re streaming data that get pushed into Flux. So I think the real answer to this question is when Flux introduces streaming data support, then yes, that is an absolute necessary, viable use case, pulling data or receiving data from a message cue. Right now, Flux won’t do it directly. You would have to take that message cue, write those messages to a data source, and then query that data source to Flux. That is the current state of Flux. I know there has been lots of discussions around modifying Flux to handle streaming data, but right now, it does not. It only handles batch queries where you’re querying data from a data source. So, yeah.
Caitlin Croft: 00:55:26.327 Is there a way to do a top five query on a time series graph so that it would only return the top five tables of the query?
Scott Anderson: 00:55:35.288 Top five tables? No. So right now, if you’re looking at a stream of tables, Flux doesn’t let you limit the number of tables in that stream of tables. It can only operate on individual tables at a time. So when you say top five tables, how are you quantifying top? Is it the first tables in — the first five tables in a stream of tables, or is it using values inside of that table? So, yeah, I think the short answer is no. But if I’m understanding the question incorrectly, if you want to create separate tables, all with aggregate values, and then return only the top five values of all of the aggregate values, yes, that’s totally possible. You simply run the aggregate, ungroup the data, and then do a limit or top function to return those values.
Caitlin Croft: 00:56:38.270 Awesome. Thank you so much, Scott. I apologize to everyone. I know there’s still a ton of questions. Everyone should have my email address. Feel free to email me if you have more questions. I’m happy to loop in Scott and he can help. The session has been recorded and will be made available for replay later today. And I hope to see you guys at the Flux training in a couple of weeks at InfluxDays. Thank you, everyone.
Senior Technical Writer, Tech Lead, InfluxData
Scott Anderson is a Technical Writer for InfluxData who thrives on distilling complex information into simple, easy-to-understand concepts. He oversees the documentation of the Flux language and loves the challenge of showing how Flux can work for your use case. Scott’s formal education is in graphic design, but he is a self-taught coder and applies skills and principles learned in both worlds to many different disciplines including programming and software development, information design, marketing, and even package design.