Building a Data Stream for IoT with NiFi and InfluxDB

Navigate to:

Apache NiFi has been a game changer in the world of IoT, allowing you to automate the transformation and flow of data from IoT, and any edge, sensor to just about anywhere you want. In addition to supporting mission-critical data volumes with rigorous security, data provenance and compliance requirements, NiFi provides developers with a drag-and-drop UI for flow-based programming and automatic “real-time” deployment of new flows.

By combining NiFi & InfluxDB, industries can easily make their IoT data streams securely accessible and usable. This solution enables the enterprise to have a single view of data across all facilities providing for proactive maintenance, failure detection, and other business-based outcomes.

The IoT challenge: OPC-based data

For this example, we will look at OPC data from factory automation and process manufacturing. OPC gives production and business applications across the manufacturing enterprise access to real-time plant-floor information in a consistent manner. However, a common challenge with our enterprise clients is how to connect their facility or enterprise historians with a modern toolset and analytics pipeline while also guaranteeing continuous data flow between them.

Here’s where the NiFi & InfluxDB platform combination can provide a powerful solution in a matter of minutes to securely collect, observe and act on your facility data.

Building data stream for IoT

Get started with the NiFi-OPCUA-bundle

Begin by downloading the following NiFi Service & Processor bundle from the Tempus IIoT framework for industrial data ingestion and analysis. These processors and the associated controller service allow NiFi access to OPC UA servers in a read-only fashion.

You will find 2 processors in the Tempus bundle, GetOPCNodeList and GetOPCData. GetNodeIds allows access to the tags that are currently in the OPC UA server. GetOPCData takes a list of tags and queries the OPC UA server for the values.

Configuring the GetNodeList processor

Start by adding the GetOPCNodeList processor to the NiFi canvas:

Get node list processor

After adding, right click the GetOPCNodeList processor and select Configure. On the PROPERTIES tab select value box for the OPC UA Service Property field and pick Create new service on the following dialog and to add a new instance of the StandardOPCUAService.

OPCUA Service Property

Next, configure the processor filter so it will only return the node-list tag items you’re interested in fetching. Use the Node Filter Property to define a pipe( ) delimited regular express list of nodes.

Additionally, you will want to set the Recursive Depth Property for how many branches “deep” the processor will browse into the OPC server’s namespace.

Configure processor

When you are done configuring the processor, click on the edit arrow icon to the right of the controller service. This will take you to the controller service configuration window.

Begin editing the StandardOPCUAService controller by entering the endpoint information of your OPC UA server:

Next, update the security settings handshake mode to match one of the available modes on your OPC-Server:

Configure controller service

Set the Authentication Policy Property to define how NiFi should authenticate with the UA server. Here, in addition to the Security Policy mode, I’ve added username/password credentials for access.

Authentication policy property

You now have the information needed to access the OPC-Server. Once the configuration of the controller is complete, click apply and enable the controller.

Configuring the GetOPCData processor

Next, add the GetOPCData OPC processor to your NiFi canvas. Right-click on the processor and select Configure from the context menu to configure the processor. Click on the PROPERTIES tab and fill out the information as below.

Get OPC data

Schema Registry service for record-based flow

The output data from the GetOPCData processor essentially produces CSV style data (minus the header) in the format of tag name, time stamp, value and status.

Schema registry service

We’ll now add the AvroSchemaRegistry controller service. This tells the record-based processors how to interpret the OPC Tag items.

That service will be added the same way we added the OPCUAClientService?. Click on the edit arrow icon again in the GetOPCData processor to configure.

Next, in the upper righthand corner of the NiFi Flow Configuration screen, click on the plus (+) icon to add a new Controller Service:

Add new controller service

Add the AvroSchemaRegistry to the Controller Services. Each property is essentially a name schema pair that the flow can then use to relate a piece of data to a schema.

Building data stream

Add the property by clicking the plus icon and naming it opcData, and then copying the avro into the value.

Schema will look like the avro below:

{
  "type": "record",
  "name": "opcData",
  "fields": [
    { "name": "uri", "type": "string" },
    { "name": "ts", "type": "string" },
    { "name": "value", "type": "float" },
    { "name": "status", "type": "int" }
     ]
  }

You now have the information needed to access the OPC-Server. Once the configuration of the controller is complete, click apply and enable the controller.

You now have the information needed to use the dynamic avro schema for reading the OPC-Data records. Once the configuration is complete, click apply and enable the controller.

InfluxDB record processor for Apache NiFi

Now it’s time to connect the InfluxDB processor to ingest the OPC data records. To provide the best possible ingest performance, InfluxDB has created a new processor that is based on NiFi Record Design.

Getting started:

Download and install the InfluxDB Processors. Copy the appropriate nar file into the lib directory of your NiFi installation ($NiFi_HOME/lib) and restart NiFi.

https://github.com/influxdata/nifi-influxdb-bundle

Head back to the NiFi canvas and add the PutInfluxDatabasesRecord processor. Connect it to the previous GetOPCData processor:

PutInfluxDatabasesRecord

Right click to configure the processor and select the PROPERTIES tab. Start with the Record Reader Property, and select the CSVReader controller service.

After adding, click on the edit arrow icon to configure:

CSVReader

In the controller service properties of the CSVReader, you will set the Schema Access Strategy and Schema Name:

Schema access strategy

We have previously registered our OPC-Data avro schema with the name opcData. Click apply and enable the controller services.

Return to the PutInfluxDatabasesRecord processor and go back to the properties tab. You will want to create a new StandardInfluxDatabaseService to the InfluxDB Controller Service property.

After creating the service, click on the edit arrow icon to configure:

StandardInfluxDatabaseService

From the service configuration window, define your InfluxDB database, URL and access credentials. This will create a shareable connection service for all NiFi processors:

NiFi processors

When finished, click apply and enable the StandardInfluxDatabaseService controller.

Finally, going back to the properties tab of your PutInfluxDatabasesRecord processor, you can now set the specified properties to write the content of the Records into InfluxDB database.

Since our record does not contain a field with a Measurement property value, we will simply set it to an arbitrary name (opc):

Measurement tags fields

Using the name pairs from our schema, set the Tags, Fields, and Timestamp properties to names defined in the opcData schema.

Final processor flow and controller services

Finally, bringing all the processors and controllers enables a flow solution that intelligently collects and transmits more data to the data center over the WAN when a certain condition is met (high pump pressures are being recorded even when the downstream pumps are off). Once the controller services configuration is complete, it only takes about 5 more minutes to configure the flow.

Controller services:

NiFi flow configuration

  • AvroSchemaRegistry - Service for registering and accessing schemas
  • CSVReader - Parses CSV-formatted data, returning each row in the file as a record
  • StandardInfluxDatabaseService - Service that provides connection to InfluxDB
  • StandardOPCUAService - Fetches a response from an OPC UA server

Process flow:

OPCUA ingestion

  • GetOPCNodeList - Access to tags currently in the OPC UA serve
  • GetOPCData - Fetches values from the OPC UA server
  • PutInfluxDatabaseRecord - Write NiFi Record structured into InfluxDB

Now all that’s left is to start your NiFi flow.

Note: Consider the above flow sample for test only. In a production setting, you ideally would have more error handling processors and redirect queues.

Explore your data in Chronograf

You can now easily jump into Chronograf and start creating dashboards on your sensor data. Click on Data Explorer, and find other sensor fields in the nifi.autogen database. Once you start seeing data, you can begin creating some dashboards.Note: Consider the above flow sample for test only. In a production setting, you ideally would have more error handling processors and redirect queues.

Industrial automation NiFi

Conclusion

If you’re limited by the analytics capabilities of your traditional systems and are looking to leverage your OPC-based data in a more modern toolset, NiFi + InfluxDB make for a powerful combination that you can deploy quickly and securely.

Using this combination, administrators can easily and quickly integrate into enterprise-wide automation and business systems. System integrators can removing legacy barriers between traditionally proprietary factory floor devices and other manufacturing software.

Enterprise automation systems