How to Use Starlark with Telegraf

Navigate to:

Our Telegraf Starlark Processor Plugin is an exciting new processor in Telegraf 1.15 that gives you the flexibility of performing various operations in Telegraf using the Starlark language.

starlark telegraf logos

What is Starlark, you ask? Starlark (formerly known as Skylark) is a language intended for use as a configuration language. Starlark is a dialect of Python. Like Python, it is a dynamically typed language with high-level data types, first-class functions with lexical scope, and garbage collection. Independent Starlark threads execute in parallel, so Starlark workloads scale well on parallel machines. Starlark is a small and simple language with a familiar and highly readable syntax.

The Telegraf Starlark Processor calls a Starlark function for each matched metric, allowing for custom programmatic metric processing. Since the Starlark language is a dialect of Python, it will be familiar to those who have experience with the Python language. However, there are major differences and existing Python code is unlikely to work unmodified. In addition, the execution environment is sandboxed and cannot access file system, network, system resources.

Even with these restrictions, the Starlark processor has the following capabilities:

  • Math operations
  • String operations
  • Renaming tags
  • Logic operations

Using the Starlark processor, you can perform real-time aggregates (min, max) on your data in Telegraf before sending it on to InfluxDB. This helps reduce the load on your InfluxDB instance by distributing processing to your servers running Telegraf.

Since we’ve had a lot of requests for math capabilities within Telegraf, we’re glad to report that this plugin lets you perform arithmetic functions. A useful IoT example was requested by a community member who wanted to calculate power locally from the current and voltage fields read from the Telegraf Modbus Input Plugin. Here’s the Starlark function within the processor to do that:

[[processors.starlark]]
 # Reads the Starlark script embedded
 source = '''
def apply(metric):
 # Get the field called current and put into variable I
 I = metric.fields['current']
 # Get the field called voltage and put into variable V
 V = metric.fields['voltage']
 # Create a new field, power, which is I times V
 metric.fields['power'] = I * V
 # Return power as part of your Telegraf metrics stream
 return metric

You can also put the above into a file, give it a “.star” file extension, and call it from your Telegraf configuration file as follows:

[[processors.starlark]]
 # File containing a Starlark script.
 script = "/usr/local/bin/myscript.star"

How to run multiple Starlark functions on Telegraf data

Only one Starlark source or script can be run per processor. So, if you would like to include multiple Starlark functions to your data, just make sure you use multiple processors and include namepass = ["measurement_name"] so all the processors will make changes to a specific data set you’re targeting. Here’s an example of that that looks like:

[[inputs.cpu]]
[[processors.starlark]]
namepass = ['cpu']
 # First Starlark script 
 source = '''
def apply(metric)
 return metric
'''
[[processors.starlark]]
namepass = ['cpu']
 # Second Starlark script 
 source = '''
def apply(metric)
 return metric

Here are more examples of the Starlark Processor in action.

starlark examples<figcaption> Credit: Roman Mager via Unsplash</figcaption>

How to calculate a percentage ratio in Telegraf

Use this Starlark code to calculate the ratio of two integers, then multiply by 100 to get a percentage. The ratio will get returned as part of the Telegraf metric stream.

[[processors.starlark]]
 source = '''
# Compute the ratio of two integer fields.
def apply(metric):
 used = float(metric.fields['used'])
 total = float(metric.fields['total'])
 # Create a new field called usage.
 metric.fields['usage'] = (used / total) * 100
 # Return usage as part of your Telegraf metrics stream
 return metric

How to scale metrics in Telegraf using Starlark

A common need is to take metrics that are in bytes, and scale them to kilobytes, megabytes, gigabytes, or terabytes. Here’s how to use Telegraf Starlark Processor to convert bytes to kilobytes:

[[processors.starlark]]
 source = '''
# Convert bytes to kilobytes
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.fields.items():
 	 if type(v) == "int":
	 # 1000 bytes in a kilobyte
 	 metric.fields[k] = v / 1000
	return metric

Here’s how to use Telegraf Starlark Processor to convert bytes to megabytes:

[[processors.starlark]]
 source = '''
# Convert bytes to megabytes
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.fields.items():
 	 if type(v) == "int":
	 # 1000000 bytes in a megabyte
 	 metric.fields[k] = v / 1000000
	return metric

And here’s how to use Telegraf Starlark Processor to convert bytes to gigabytes:

[[processors.starlark]]
 source = '''
# Convert bytes to gigabytes
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.fields.items():
 	 if type(v) == "int":
	 # 1000000000 bytes in a gigabyte
 	 metric.fields[k] = v / 1000000000
	return metric

(If you prefer to use base-2 values to convert your storage, then you’re talking mebibytes, gibibytes, etc. This article explains the difference and how to calculate them.)

One last example: sometimes it’s helpful to scale metrics to ensure that they display correctly in a chart. Here’s how you can use the Telegraf Starlark Processor to scale up a number by 10:

[[processors.starlark]]
 source = '''
# Multiply any float fields by 10
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.fields.items():
 	 if type(v) == "float":
 	 metric.fields[k] = v * 10
	return metric

Transform values in Telegraf using Starlark

Suppose you need to transform one value to another in your Telegraf telemetry. For instance, F5 network devices return their status as an integer with values 0 through 5, where the value of 1 means healthy and the rest are variations of unhealthy status. Suppose I want to collapse this to a binary state — just one or zero — where one means healthy and zero means unhealthy, as shown in this table:

Original value New value
0 0
1 1
2 0
3 0
4 0
5 0

Note that we don’t need to change anything if the values are zero or one; we only need to make changes if the values are 2 through 5. Here’s how I’d make those changes in Telegraf using Starlark:

[[processors.starlark]]
 namepass = ['measurementname']
 source = '''
def apply(metric):
	# v stands for value
	v = metric.fields.get('status')
	# If no value, do nothing:
	if v == None:
 	return metric
	# When v is 2, 3, 4, or 5:
	if 1 < v and v < 6:
 		metric.fields['status'] = 0
	return metric

How to calculate the percent of healthy devices using Flux

Let’s continue with this network monitoring example. Performing this kind of transformation locally simplifies working with the data upstream. Now that this kind of transformation has occurred, we can perform a much simpler mathematical calculation like sum(status)/count(status)*100 to get the percentages of healthy devices.

Using Flux, the query looks like this:

// First, create a variable called data
data = from(bucket: "device_data")
 |> range(start: -10m)
 |> filter(fn: (r) => r._measurement == "network_devices" and r._field == "status")

// Create a sum of the data across one minute windows
sum = data
 |> aggregateWindow(every: 1m, fn: sum)
 |> map(fn: (r) => ({ r with _field: "sum" })

// Create a count of the data across one minute windows
count = data
 |> aggregateWindow(every: 1m fn: count)
 |> map(fn: (r) => ({ r with _field: "count" })

// Create a union of the data, using the sum and the counts to calculate percent of healthy devices
union(tables: [sum,count])
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
 |> map(fn: (r) => ({ r with
 _field: "percent_healthy",
 _value: (float(v: r.sum) / float(v: r.count)) * 100.0
 }))

In one pass, this Flux script collects the data and then uses it to calculate both the sum and count while storing those values in separate variables. The data is then combined via a union of the two variables and the percent_healthy result is calculated producing a single value in 1 min intervals over the past 10 minutes.

How to rename tags in Telegraf using Starlark

If you need to rename tags in your line protocol, here’s how you’d use Starlark:

[[processors.starlark]]
 source = '''
# Rename any tags using the mapping in the renames dict.
renames = {
	'lower': 'min',
	'upper': 'max',
}
def apply(metric):
	# k stands for key, v for value
	for k, v in metric.tags.items():
 	if k in renames:
 	metric.tags[renames[k]] = v
 	metric.tags.pop(k)
	return metric

How to use conditional logic with Telegraf and Starlark

Starlark also supports changing metrics only if certain logical conditions are met. For example, suppose you wanted to set any metric with a value between 1 and 6 to zero. Here’s how you’d do that with the Telegraf Starlark Processor:

[[processors.starlark]]
 source = '''
# Set any 'status' field between 1 and 6 to a value of 0
def apply(metric):
	# v stands for value
	v = metric.fields.get('status')
	if v == None:
	 # If there is no status field
 	 return metric
	if 1 < v and v < 6:
	 # If status is greater than 1 and less than 6, set status to zero
 	 metric.fields['status'] = 0
	# Return your modified metric to the Telegraf metrics stream
	return metric

How to drop metrics based off a numeric condition

Here’s how to store metrics in InfluxDB if they meet a particular numeric condition, such as detecting a 500 error:

def apply(metric):
    if metric.fields["value"] <= 10:
      return metric // keep
    else:
      return None // drop

Demo of Starlark and Telegraf

If you’d like to see a demo of how to write Starlark scripts for Telegraf using Visual Studio Code, check out this demo video:

Conclusion

As you can see, there’s a lot you can do with the Starlark Processor. For more, check out the list of examples and uses for the Starlark Processor if you would like to see how it could enhance your data collection. Also if you would like to read more about Starlark, take a look at the Starlark specifications for more details about the syntax and available functions you can use with this processor.

Got your own Starlark functions that you think others would find useful? Please share them in our public InfluxData Slack channel and InfluxDB community site. And feel free to post any questions you have there if you need help. Enjoy!