InfluxDB as an IoT Edge Historian: A Crawl/Walk/Run Approach

Navigate to:

This article was originally published in The New Stack.

InfluxDB - IoT Edge

Photo by Tim Vanderhoydonck on Unsplash

The question of how to get data into a database is one of the most fundamental aspects of data processing that developers face. Data collection can be challenging enough when you’re dealing with local devices. Adding data from edge devices presents a whole new set of challenges. Yet the exponential increase in IoT edge devices means that companies need proven and reliable ways to collect data from them.

The following are three different approaches to collecting data from edge devices. Edge devices have different capabilities – processing power, memory capacity, connectivity, etc. – so finding the right solution for your edge computing use case may require a bit of trial and error. However, you can use these approaches as a jumping-off point for building your solution.

For context, we’re using InfluxDB as the processing and storage solution, and the cloud version of InfluxDB is the target destination here. Each edge device in these examples also runs the open source version of InfluxDB. We’re using the Flux language to create tasks that perform data transformations and annotations.

Crawl: Basic store and forward

This is the most basic setup for getting data from edge devices to an InfluxDB cloud database. Here raw data is written directly from an edge instance of InfluxDB to a cloud instance.

The process for the basic store-and-forward approach looks like this:

  1. Create a bucket in the instances of InfluxDB running on the edge. We'll call the bucket "devices" and set a retention policy for that data to one hour.
  2. Create a bucket in your cloud instance of InfluxDB. We'll call this bucket "edge_devices" and set it up with a 30-day retention policy.
  3. Create a task in InfluxDB that pushes raw data from the "devices" bucket at the edge to the "edge_devices" bucket in the cloud every one minute.
  4. Inject an "edge_host" tag to the incoming data.
# Create edge bucket "devices" with ~1hr retention policy
influx bucket create --name "devices" --retention 1h

# Create cloud bucket "edge_devices" with ~30 day retention policy
influx config cloud2
influx bucket create --name "edge_devices" --retention 30d

# Write a task that pushes raw data to "edge_devices" bucket every ~1 minute
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "raw_northbound", every: 1m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_devices")

# Inject an edge_id tag - See line 20

Advantages

  • Quick and easy to set up.
  • Requires little edge computing power.

Disadvantages

  • Increases amount of unclean data in cloud instance.
  • Requires more throughput for edge data connection.

Walk: Downsample and forward

In this approach, raw data from the edge is downsampled in-memory at the edge, and that downsampled data is pushed to the cloud where it is written into InfluxDB.

The process for the basic store-and-forward approach looks like this:

  1. Create a bucket in the instances of InfluxDB running on the edge. Again, we'll call the bucket "devices" and set a one-hour retention policy for that data.
  2. Create a bucket in your cloud instance of InfluxDB. We'll call this bucket "edge_aggregate" and set it up with a 30-day retention policy.
  3. Create a task in your edge instance of InfluxDB that downsamples the raw data from the "devices" bucket at the edge and sends that downsampled data to the "edge_aggregate" bucket in the cloud every one minute.
  4. Inject an "edge_host" tag to the incoming data.
# Create edge bucket "devices" with ~1hr retention policy
influx bucket create --name "devices" --retention 1h

# Create cloud bucket "edge_aggregate" with ~30 day retention policy
influx bucket create --name "edge_aggregate" --retention 30d

# Write a task that downsamples data and pushes to "edge_aggregate" bucket every ~1 minute
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "aggregate_northbound", every: 1m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_aggregate")

# Inject an edge_id tag - see line 20

Advantages

  • Quick and easy to set up.
  • Reduces the amount of unclean, raw data sent to cloud instance.
  • Reduces edge data connection throughput needs.

Disadvantages

  • Requires more edge computing power.
  • Requires more edge memory.
  • Requires edge storage.

Run: Robust edge historian

In this third example, we’re doing even more work on the edge. Here, the role of data historian effectively is moved to the edge because the edge instance of InfluxDB collects, processes and writes that data.

Following in the steps of the previous approach, the raw data is downsampled on the edge, but now we’re writing that downsampled data to an instance of InfluxDB at the edge as well. Finally, that saved, downsampled data is pushed to the cloud instance of InfluxDB.

  1. Create a bucket in the instances of InfluxDB running on the edge. Call the bucket "devices" and set a one-hour retention policy for that data.
  2. Create another bucket in InfluxDB on the edge and call it "northbound" with a retention policy of one day.
  3. Create a bucket in your cloud instance of InfluxDB. We'll call this bucket "edge_aggregate" and set it up with a 30-day retention policy.
  4. Create a task in your edge instance of InfluxDB that downsamples the raw data from the "devices" bucket at the edge and writes that downsampled data to the "northbound" bucket on the edge every one minute.
  5. Create another task on the edge that pushes the data from "northbound" to "edge_aggregate" in the cloud every five minutes.
  6. Inject an "edge_host" tag to the incoming data.
# Create edge bucket "devices" with ~1hr retention policy
influx bucket create --name "devices" --retention 1h

# Create edge bucket "northbound" with ~1 day retention policy
influx bucket create --name northbound --retention 1d

# Create cloud bucket "edge_aggregate" with ~30 day retention policy
influx bucket create --name "edge_aggregate" --retention 30d

# Write a task that downsamples data into "northbound" bucket every ~1 minute
import "influxdata/influxdb/tasks"

option task = {name: "aggregate_local", every: 1m, offset: 0s}

from(bucket: "devices")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> set(key: "edge_id", value: "001")
|> to(bucket: "northbound")

# Write a task that pushes from "northbound" to "edge_aggregate" every ~5 minutes
import "influxdata/influxdb/tasks"
import "influxdata/influxdb/secrets"

option task = {name: "sync_northbound", every: 5m, offset: 0s}

cloud_host = secrets.get(key: "cloud_host")
cloud_org = secrets.get(key: "cloud_org")
cloud_token = secrets.get(key: "cloud_token")

from(bucket: "northbound")
|> range(start: tasks.lastSuccess(orTime: -1h))
|> to(host: cloud_host, org: cloud_org, token: cloud_token, bucket: "edge_aggregate")

# Inject an edge_id tag - see line 18

Advantages

  • Quick and easy to set up.
  • Significantly reduces the amount of unclean, raw data sent to cloud instance.
  • Reduces edge data-connection throughput needs.
  • Resilient to connectivity outages between cloud and edge.

Disadvantages

  • Requires more edge computing power.
  • Requires significantly more edge memory.
  • Requires edge storage capacity.

As we’ve noted, there are advantages and drawbacks to each of these approaches, so choose the one that makes the most sense for your tech stack and use case.

You can replicate or adjust these tasks across your distributed edge devices to maximize your available resources.

Regardless of the approach you choose, this type of solution can give you better observability of devices on the edge. It also helps streamline data processing across your entire ecosystem by reducing the size of the data set in the central storage instance.