Enriching Your Data with Kapacitor

Navigate to:

From time to time, we have seen requests from the community around querying InfluxDB based on a business period such as the typical business day or broken down into shift periods. Consider the following request: How do I summarize my data for the entire month of August just for business hours, defined by Monday through Friday between 0800AM and 0500PM? InfluxQL does not currently possess any functions for filtering based on time. We are limited to:

SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000';

So how do we accomplish this? The provided Telegraf plugins typically just send a timestamp value, and also have the ability to send some static tags in addition to the metrics and tags associated with the configured plugin. The solution is to use Kapacitor as a “pre-processor” to “decorate” or enrich your data with a computed value that represents a time period that you desire to query.

For the purposes of this article we are running on localhost for Telegraf, InfluxDB, and Kapacitor, but in a full-fledged environment these will be running on different hosts.

The first step is to configure Telegraf to write to Kapacitor instead of directly to InfluxDB. In the [[outputs.influxdb]] section of your telegraf.conf file, there are 3 key settings to consider:

urls = [http://localhost:9092]
database = "kap_telegraf"
retention_policy = "autogen"

The urls parameter must point to port 9092 (Kapacitor’s default listening port) instead of port 8086 for InfluxDB. The database parameter should point to a non-existent database (you can ignore Telegraf’s warning about the database not being found). The retention_policy parameter should be set to “autogen” or a specific retention policy that you have previously created in your instance.

CAUTION: Leaving retention_policy set to “” (default) is not the same as “autogen” which is specified as the default retention policy when InfluxDB is initialized.

All other settings in telegraf.conf can be configured normally for your instance.

The next step is to create a TICKscript that will process the data coming from Telegraf. In this example, we are interested in creating a tag that will contain a true or false value if the data point is during business hours that we described above.

  |eval(lambda: if(((weekday("time") >= 1 AND weekday("time") <= 5) AND (hour("time") >= 8 AND (hour("time")*100+minute("time")) <= 1700)), 'true', 'false'))

In this TICKscript, we are streaming from the non-existent database “kap_telegraf” that we configured above in our telegraf.conf file. The .from() method for the stream() node only needs the database to match to. We then pass control to an eval() node that will evaluate if the point has arrived in the window that we have designed. In this case, we utilize the weekday(), hour() and minute() functions described at https://docs.influxdata.com/kapacitor/v1.3/tick/expr/#time-functions to evaluate the “time” value. The first part of the condition evaluates whether the day of the week falls between Monday (1) and Friday (5). The second part of the condition evaluates the hour value, being careful to consider that we want to stop at the “00” mark in hour 17 (5pm). To do that, we are multiplying the hour by 100 and adding the result of the minute() function to compare to the ending time of 1700. If the time value falls within this range, the eval() node returns true as a field called business_hours. However, since we want to query on this value, we should have it as a tag, so we chain a .tags() method to change the value to a tag called business_hours.

IMPORTANT: The eval() node will eliminate all other fields and tags from the stream, so we want to specify .keep() to retain these values from the stream.

At this point, we have both a field and a tag called business_hours that contains the output of the eval() node. We should filter this out of the stream by calling a delete() node that specifies the field to be deleted via the .fields() method.

Finally, we pass control in the stream to an influxDBOut() node that specifies the destination database and retention policy to write to. We have added an additional static tag for this example called kapacitor-augmented. All other data like measurement name is carried through, and it is only necessary to provide new information. In this case, we are going to write to the telegraf database and the autogen retention policy.

Once you have created the TICKscript (referenced below as businesshours.tick), we must tell Kapacitor that we want to run it. For this example, we will utilize the Kapacitor CLI to configure the task.

$ kapacitor define business_hours -type stream -dbrp kap_telegraf.autogen -tick business_hours.tick

This command defines the task called business_hours as a stream type listening for writes to kap_telegraf.autogen which is the database name and retention policy name that was configured in Telegraf above. Once we have successfully created the task, we need to enable it for processing by Kapacitor.

$ kapacitor enable business_hours

To show the status, we can ask Kapacitor to list the current tasks:

$ kapacitor list tasks
ID             Type      Status    Executing Databases and Retention Policies
business_hours stream    enabled   true      ["kap_telegraf"."autogen"]

Once data begins the flow through Kapacitor to InfluxDB, you can then add your condition AND business_hours='true' to the first query we specified:

SELECT * FROM "mymeasurement" WHERE time >= '2017-08-01 08:00:00.000000' and time <= '2017-08-31 17:00:00.000000' AND business_hours='true';

In summary, we have shown a simple example of how to use Kapacitor to enrich your data coming from Telegraf into InfluxDB. This method could be used to add other types of data, potentially obviating the need to create custom Telegraf plugins to meet your business needs.