Getting Started Using Scripts with InfluxDB

Navigate to:

This article was originally published in The New Stack and is reposted here with permission.

Using scripts with a time-series database helps developers streamline application development, scale workloads and build lean integrations.

Time-series data is everywhere, and that reality isn’t going to change. The very nature of time-series data means that time-series workloads differ from a lot of other kinds of data. Given the prevalence of time-series data in our modern, connected world, it’s more important than ever to ensure that developers have tools to manage it.

By using scripts with the time-series database, InfluxDB helps developers streamline and democratize application development, scale time-series workloads and build lean integrations to augment system or application functionality.

In the context of InfluxDB, scripts consist of Flux code. (Flux is InfluxDB’s scripting and query language. You can learn how to use Flux here.) You can write Flux scripts on your local machine and then upload that code and install it in InfluxDB Cloud. Once in InfluxDB Cloud, you can share and leverage these scripts in different ways.

Save and Share Code in Teams

First off, scripts can be a great learning tool for development teams. Flux experts can write advanced Flux applications and save them as scripts. Other team members can then access that code and use it directly as it applies to their needs. Having access to these scripts means that team members building their Flux skills have a valuable resource to read, learn from and reuse in their own work.

You can also use scripts in conjunction with tasks. The InfluxDB task system is an automated platform for running queries on a specific schedule. Installing a script within a task means that the script executes every time the task runs on its schedule. This is helpful because you can install the same script on multiple tasks. When you update that script, the changes automatically apply to all the tasks that call the script. If you have lots of tasks, the ability to make changes in one place and instantly apply them across your system is a huge time saver.

Dynamic Data Analysis

The ability to pass dynamic parameters into scripts streamlines the development and scale-management processes. Let’s say you have a manufacturing facility where you need to maintain the temperature inside the building, so you set up temperature sensors all over it. You need to run the same check on every sensor, but the frequency of that check changes for different sensor models. The following script is the beginning of a simple temperature check operation. The script has some dynamic variables that it will pull from the task that calls the script.

import "slack"

sendSlackMessage = (text, color) =>
   slack.message(
       url: "https://hooks.slack.com/services/YOURHOOKHERE",
       token: "",
       channel: "team-sensors",
       text: text,
       color: color,
   )

from(bucket:"sensors")
 |> range (start:-tasks.every)
 |> filter (fn(r)=>r.sensor_id==params.sensor)
 |> map( fn(r) => {
           return {r with sentMessage:
             if r.temperature > 80
             then string(v: 200 == sendSlackMessage(color: "warning", text: "Temperature on sensor ${r.sensor_id} is WARN: ${r.temperature}") ) }
             else if r.temperature > 100
               then string(v: 200 == sendSlackMessage(color: "danger", "text": "ALERT: Temperature sensor on ${r.sensor_id} needs immediate attention: ${r.temperature}"))
             else "false"
       }
   )
 |> to(bucket: "SensorCheckLog")

We have two task examples here. The first checks the temperature of a sensor every five minutes. The task calls the check_temp script and passes in the sensor ID to check.

Influx script list

The next task does the same thing, with the primary difference being that the task_10 task checks a different sensor at a different interval. This time, it’s every 10 minutes.

influx task - create name

Generating Value with Data and Scripts

Having the ability to dynamically insert data into scripts and tasks makes it easier to manage tasks and devices at scale. Using scripts in this way lets you quickly slice and dice your data in any way that provides value.

For instance, let’s say that you want to perform the same transformation on a data set at two different levels of granularity. The following script calculates a simple mean. The data source, data output location and window for aggregation are all dynamic values.

// script get_downsample
from(bucket:params.from)
|> range(start:-tasks.every)
|> AggregateWindow(period:params.period, fn:mean)
|> to(bucket:params.to)

To use this script in a task, we need to call the script and pass in the parameters. Here, we’re selecting data from the raw_data bucket, taking the mean over a five-minute window and then sending that aggregated data to a new bucket called ds_5m.

Influx script list-2

The following task performs the same operation on the same data, with some key differences. We’re getting coarser granularity for our mean here, averaging data over 15-minute windows. We want to keep this data separate for analysis, so we’ll send it to its own bucket, ds_15m.

create name

If it turns out that a basic mean operation doesn’t meet your needs, you can update the script with a more advanced data analysis algorithm, which the tasks will automatically start using. Again, this facilitates managing data sources and processing at scale.

Smarter Alerting with Scripts

While scripts make it easier to scale systems, they can also help monitor those growing systems, too. You can use scripts to streamline alerting based on time-series data. Let’s say we build a policy for alerting the site reliability engineering (SRE) team when issues arise. One way to do this is to write out your alerting policy in a Flux script.

Using a time-series database like InfluxDB, this may include items such as loading messaging secrets. (InfluxDB has a secure vault for storing secrets, and Flux has calls to extract those secrets when needed.) Another policy element to consider is a process to eliminate duplicate alerts. Rather than having the same alert continuously pinging your endpoints, compare the current alert to the previous one to see if a status change exists. If not, there’s no need to send another alert.

The section of the script where you set up the actual endpoint(s) and criteria for where and when to send alerts is a useful place to use dynamic variables.

import "influxdata/influxdb/secrets"
import "slack"
import "pagerduty"
import "array"

slackhook =  secrets.get(key: "SLACK_HOOK")
pd_key = secrets.get(key: "PD_KEY")

sendSlackMessage = (text, color) =>
   slack.message(
       url: slackhook,
       token: "",
       channel: "your-channel",
       text: text,
       color: color,
   )

pagerduty_endpoint = pagerduty.endpoint()
sendPagerDutyMessage = (
   service,
   level,
   summary,
   timestamp,
   key,
) =>
   pagerduty.sendEvent(
       class: "error_alert",
       client: "your-company",
       clientURL: "http://your.company.com",
       dedupKey: key,
       eventAction: pagerduty.actionFromLevel(level),
       group: service,
       routingKey: pd_key,
       severity: pagerduty.severityFromLevel(level),
       source: "ALERT_SRE",
       summary: summary,
       timestamp: timestamp,
   )

record = if params.level=="crit"
then {pd_sent: sendPagerDutyMessage(
                                   key: params.pagerdutyDedupKey,
                                   level: params.level,
                                   service: "sre-services",
                                   summary: params.message,
                                   timestamp: now(),
                               ).statusCode,
     slack_sent: 0,
     message: params.message,
     _time: now(),
     level: prams.level
}

else if params.level=="warn"
then {    pd_sent: 0,
     slack_sent: sendSlackMessage(text: params.message, color: "warning"),
     message: params.message,
     _time: now(),
     level: prams.level
}
else if params.level=="ok"
then {pd_sent: sendPagerDutyMessage(
                                   key: params.pagerdutyDedupKey,
                                   level: params.level,
                                   service: "sre-services",
                                   summary: params.message,
                                   timestamp: now(),
                               ).statusCode,
     slack_sent: sendSlackMessage(text: params.message, color: "ok"),
     message: params.message,
     _time: now(),
     level: prams.level
}
// shouldn't hit this case, is added for syntactical correctness
else {pd_sent: 0,
     slack_sent: sendSlackMessage(text: params.message, color: "ok"),
     message: params.message,
     _time: now(),
     level: prams.level}

array.from(rows:[record])
   |> set(key: "_measurement", value: "SRE_ALERTS")
   |> group(columns: ["level", "pd_sent", "slack_sent", "_measurement"])
   |> wideTto(bucket: "AlertSeriesLog")

In the example above, when a task generates a critical alert, the script pulls in a custom message and sends that to a PagerDuty endpoint. If it’s a critical issue, it’s probably worth waking someone up to fix it. The warning alert sends a custom alert message to a group messaging platform like Slack. If the alert level returns to OK, you might want to set up alerts to both the on-call SREs and the wider team to let them know that the issue is resolved.

It’s also a good idea to write alert history data to a new bucket in InfluxDB, so make that part of your alerting policy. The alert history provides SREs with detailed info about events so they can track the major milestones of an issue. Alert history also informs the duplicate elimination process. Because we’re dealing with time-series data, it’s easy to look back at the previous alert and determine if it’s a duplicate or something new.

Expanding Possibilities with Invokable Scripts

Another helpful option is to install the alerting policy script as an invokable script. Installing a Flux script behind an invokable URL enables users to access their time-series data and InfluxDB from outside the platform, using an authenticated token.

This drastically streamlines app development because it allows developers to access time-series data without the need to install additional libraries or components into their code. They can simply call the URL endpoint.

So, to continue our example, putting a script behind an invokable URL enables external services or applications that support your application to use the same alerting policy by calling the script URL. These services don’t need to worry about the details of the policy. They just need to pass in parameters for warning level and a corresponding message.

Conclusion

No matter how you plan to use time-series data, scripts can help you use it more efficiently and effectively. Scripts that take dynamic parameters facilitate easier scaling, because you can write scripts to do complex operations and simply reuse them across your entire solution. You can even expose scripts to external sources and build more complex and robust solutions with fewer lines of code.