What the Pivot() is Going On with the MQTT Plugin?

Navigate to:

The MQTT Consumer Plugin is one of our most widely used input plugins for Telegraf. If you need a little bit of background, then I highly recommend checking out the following:

I plan to release an MQTT best practices blog soon, but we thought this plugin partnership was too good not to talk about now.

Background

So if you have used Flux, you are probably familiar with the concept of pivoting your data. Essentially you are rotating values from a vertical format into a horizontal format more akin to a relational database:

Before Pivot

before-pivot

After Pivot

After-pivot

Pivoting with Telegraf

We can achieve a similar result with the MQTT Consumer and Pivot Processor plugins. Let’s consider the following example:
Pivotin-with-Telegraf-OG

We have a fleet of construction vehicles working on a new road. Each vehicle sends health metrics to an MQTT broker. Our topic structure looks like this:

RoadRoller/Temperature
RoadRoller/Speed
RoadRoller/Vibration

Each subtopic (temperature, speed, vibration) is producing a single value.

Let’s first create a config to read from these topics:

[[inputs.mqtt_consumer]]
    ....
 topics = ["RoadRoller_1/#"]

  data_format = "value"
  data_type = "float"

[[inputs.mqtt_consumer.topic_parsing]]
        topic = "RoadRoller_1/+"
        measurement = "measurement/_"
        tags = "_/field"

The output of our topic would look like this:

RoadRoller,field=temperature value=32.2
RoadRoller,field=Speed value=20
RoadRoller,field=Vibration value=1.0752

Now let’s pivot():

[[inputs.mqtt_consumer]]
    ....
 topics = ["RoadRoller_1/#"]

  data_format = "value"
  data_type = "float"

[[inputs.mqtt_consumer.topic_parsing]]
        topic = "RoadRoller_1/+"
        measurement = "measurement/_"
        tags = "_/field"

[[processors.pivot]]
    tag_key = "field"
    value_key = "value"

Here is our new result:

RoadRoller temperature=32.2,Speed=32.2,Vibration=1.0752=32.2

As you can see we have rolled these fields into a single multi-field metric.

Ecosystem benefits

So what are some of the benefits of doing this?

Reduces the number of messages being sent: With our current example, this might seem trivial but if you expand the number of MQTT sources we are gathering single values from, the number of metrics being written starts to add up. For example, a fleet of 150 construction vehicles each writing to 3 sub-topics totals 450 metric lines. Pivoting reduces our metric line count to 150. Note: This is only effective when dealing with single-value topics that send data in parallel. Or when using the Merge Plugin.

A friendlier format for parsing: Telegraf can serialize data into a variety of formats — JSON, Wavefront, Line Protocol & Prometheus just to name a few. The problem comes when representing the metric in a usable way. Let’s take JSON:

Without parsing the topic field as a tag

{"fields":{"value":23},"name":"RoadRoller_1","tags":{"host":"foo","topic":"RoadRoller_1/speed"},"timestamp":1654697690}

The problem here is we lose the field name altogether. We would need to extract the field name from the topic. Not great.

Parsing the topic field as a tag but without pivoting

{"fields":{"value":23},"name":"RoadRoller_1","tags":{"field":"speed","host":"fool","topic":"RoadRoller_1/speed"},"timestamp":1654697690}

With this example, we can now reference the field name, though this is nested within the tags object.

Parsing with pivot

{"fields":{"speed":23},"name":"RoadRoller_1","tags":{"host":"foo","topic":"RoadRoller_1/speed"},"timestamp":1654697690}

Finally, with the pivot, we can now reference them both as a key-value pair. This structure is more widely accepted within JSON parsing architectures.

Conclusion

I hope this blog post provided an insight into the power of processors and how they can shape your data for the better. So what next?

I am a strong believer in the power of having a go:

  • You can find an example Telegraf config here.
  • I have started to put together an MQTT Simulator with different scenarios here.

Also, learn more about collecting data with Telegraf by taking the free InfluxDB University Data Collection with Telegraf course. Get that badge!