Setting Up an MQTT Data Pipeline with InfluxDB
By
Cole Bowden
Apr 17, 2026
Developer
Navigate to:
In this blog, we’re going to take a look at how you can set up a fully-functioning, robust data pipeline to centralize your data into an InfluxDB instance by collecting and sending messages with the MQTT protocol. We’ll start with a brief overview of the technologies and protocols used in the pipeline, then dive into how you can connect, configure, and test them to ensure your data pipeline is fully functional. It’s going to be a long post, so let’s jump right in.
What is MQTT?
MQTT is an industry-standard, lightweight protocol for moving messages through a network of devices. It functions by having a broker, or multiple brokers, receive messages from individual devices (publishing clients) across the network, and publish those messages to external systems (destination clients) that are connected and listening to the broker. By categorizing messages into “topics,” systems that subscribe to specific topics can opt to receive only messages they’re interested in.
As a lightweight protocol with a number of prominent open source implementations, MQTT is an industry standard for a variety of use cases. It’s particularly common in Internet of Things (IoT) and Industrial IoT (IIoT) applications, but can be leveraged anywhere you have a distributed network of devices generating data or messages. This includes fleet management, home automation, real-time telemetry on computer hardware, and practically any use case where sensors generate data points periodically.
Why use InfluxDB for MQTT data?
If you’ve already concluded that the MQTT protocol is the right way to move your data from various devices into a centralized broker, odds are that you’re working with time series data. Time series data has a couple of key characteristics: it’s a sequence of data collected in chronological order, and all data points contain a timestamp. Most commonly, this also means there’s a large volume of data. Hundreds or thousands of sensors generating new data points every second can quickly turn into millions or billions of records per day. As the scale of data increases, the need for a specialized, purpose-built solution to handle this volume grows, too.
That’s where InfluxDB, the industry-leading time series database, comes in. InfluxDB is purpose-built for the time series data common in MQTT use case scenarios, delivering unparalleled performance and a number of dedicated features to make managing and working with your time series data as easy as possible.
Performance is critical because ingesting millions or billions of data points per day can strain most databases. Because time series databases like InfluxDB are optimized to handle that firehose of continuous data, they can scale to handle and ingest it with greater efficiency and lower costs. A custom-built storage engine eliminates snags that most other types of databases encounter, such as index maintenance and contention locks. Last-value caches and engine optimizations for timestamp-based filtering makes retrieving recent data extremely efficient, so fresh data being written into InfluxDB can be queried in less than 10 milliseconds, minimizing time to insight (or as we like to call it, “time to awesome”). This ensures a real-time view of the data generated across your network of devices.
Time series functionality also makes managing and working with this data much easier, regardless of if performance at scale is a concern. DataFusion, the SQL query engine embedded into InfluxDB 3, makes it easy to query with a language most data professionals and AI agents already know. With dedicated time-based functions, queries that look like this in a general purpose database:
WITH hours AS (
SELECT generate_series(
date_trunc('hour', now() - interval '24 hours'),
date_trunc('hour', now()),
interval '1 hour'
) AS hour_bucket
),
sensors AS (
SELECT DISTINCT sensor_id FROM sensor_data
),
hour_sensor AS (
SELECT h.hour_bucket, s.sensor_id
FROM hours h
CROSS JOIN sensors s
),
agg AS (
SELECT
sensor_id,
date_trunc('hour', time) AS hour_bucket,
percentile_cont(0.95) WITHIN GROUP (ORDER BY temperature) AS p95
FROM sensor_data
WHERE time >= now() - interval '24 hours'
GROUP BY sensor_id, hour_bucket
)
SELECT
hs.hour_bucket,
hs.sensor_id,
COALESCE(a.p95, 0) AS p95
FROM hour_sensor hs
LEFT JOIN agg a USING (hour_bucket, sensor_id)
ORDER BY hs.sensor_id, hs.hour_bucket;
Can be shortened to this in InfluxDB:
SELECT
date_bin_gapfill(INTERVAL '1 hour', time) AS hour,
sensor_id,
interpolate(percentile(temperature, 95)) AS p95
FROM sensor_data
WHERE time >= NOW() - INTERVAL '24 hours'
GROUP BY hour, sensor_id;
Admittedly, this is a cherry-picked example for a complicated function most users won’t use every day, but there are plenty that aren’t. The InfluxDB 3 processing engine comes with a host of built-in plugins for processing and transforming data as it’s written, monitoring and anomaly detection, forecasting, and alerting. Retention policies can be set at a database or table level, ensuring you keep data as long as it’s useful, and the downsampling plugin for the processing engine can help you keep your data at a lower resolution once it’s past the end of that policy. InfluxDB also has tons of connections to the ecosystem of data visualization tools, clients, and, critical for the purposes of this tutorial, integrates seamlessly with Telegraf, the data collection agent we’ll be using to move data from our MQTT broker into InfluxDB.
The MQTT -> InfluxDB pipeline
The architecture of this data pipeline is relatively straightforward, with data flowing in one direction throughout:
- Devices, sensors, and anything generating raw data are set up as an MQTT publishing client connected to the broker.
- The MQTT broker receives the raw data from the various publishers and forwards it.
- Telegraf subscribes to the published topics and then writes data into InfluxDB.
- The InfluxDB processing engine handles all necessary transformations and makes the data immediately available for querying and visualization.
So let’s jump into specifics.
Setting Up the MQTT Broker and Clients
The first thing you’re going to need to do is install the MQTT technology of your choice on every device that’s going to be a publishing client, as well as on the server you want to act as your broker. Eclipse Mosquitto is a common open source option for MQTT that we’ll use in this guide, but any other MQTT client, such as HiveMQ, Paho, MQTTX, MQTT Explorer, or EasyMQTT, will also work great for this tutorial. The exact commands will differ depending on what you’re using, but the concepts will remain the same, as it’s a standardized protocol.
To install Eclipse Mosquitto:
- On Linux, run:
snap install mosquitto - On Mac: Install Homebrew, then run
brew install mosquitto - On Windows: Go to the mosquitto download page and install from there
When you install Mosquitto, the installer will then tell you the exact file path that the configuration file sits in. You’ll want to configure your broker first, and you should set up authentication if you don’t want to allow unauthenticated connections. A lack of authentication can be fine if you’re running everything on a local network where you’re not doing any port forwarding, but it’s not recommended if your devices are communicating over the internet.
There are many different ways to set up authentication with Mosquitto—one of the simplest is creating a password file with the mosquitto-passwd command, but you can read a full list of options on their documentation page for authentication methods. Whatever you settle on, if you decide to use some form of authentication, you’ll need to add the following line to your Mosquitto configuration file.:
allow_anonymous false
There are many other configuration options in the documentation, and what you set and configure will depend on your use case, but some you may want to consider are:
persistence false- Because we’re writing to InfluxDB, we don’t need to persist messages to disk.log_dest stdout- For setting up, testing, and debugging, outputting logs directly to the terminal makes things easier.
And of course, make sure your listener is configured on the same port for all devices. The default is 1883, but you can change this if desired.
Once you configure your broker, you can set up your publishing clients, and with whatever data you’re measuring, they can publish messages to the broker with the command:
mosquitto_pub -h "host" -t "topic" -m "value"
If you’re running this all on a local network, your host will be localhost; otherwise, it’ll be the address where your broker is running. The value should be whatever you’re measuring and publishing at that moment.
Your topic can be whatever is appropriate to label that value. If you have different devices and different types of measurements for each device, it’s recommended to nest your topics and organize them in a way that makes logical sense. For example, if you have many different devices measuring, say, temperature and velocity, your topic arrangement may look like:
- /sensors/vehicles/v1/device1/temp
- /sensors/vehicles/v1/device1/velocity
- /sensors/vehicles/v1/device2/temp
- /sensors/vehicles/v1/device2/velocity
As long as you have a unique topic structure for each type of value being sent, we can parse and sort this into tags and fields with InfluxDB. For further information on setting up MQTT topics, there are plenty of great guides on the matter.
With your clients and broker configured, your clients publishing messages, and your broker receiving and forwarding those messages, you should be all set up for the MQTT portion of this data pipeline.
Installing InfluxDB
The next step is to move your MQTT data into InfluxDB. The first step is to install InfluxDB. You can check out our docs on installing it here, but the simplest and easiest way to get started is to run the install scripts provided by InfluxData with:
curl -O https://www.influxdata.com/d/install_influxdb3.sh \
&& sh install_influxdb3.sh
These should work on every operating system and provide you with some simple options to get started with InfluxDB 3 Core or Enterprise. The installation script should also give you an admin token, which you’ll want to store somewhere safe so you can use it for authentication. If you’d like to further configure your InfluxDB 3 instance, the installation script should tell you where all files and configuration files were installed for further adjusting, though it should run fine out of the box.
If you have Docker installed, you can also install the InfluxDB Explorer UI as part of this process, giving you an easy way to view, manage, and query your InfluxDB 3 instance. You can reach it by navigating to localhost:8888 in your browser, entering host.docker.internal:8181 for the server address, and providing the admin token.
Installing and Configuring Telegraf
With InfluxDB 3 installed and running, the last step to get the data pipeline operational is to install and configure Telegraf to connect our MQTT broker to InfluxDB. Telegraf installation varies by operating system and Linux distribution, so check out the Telegraf documentation on installation to find the right files or command to run.
If you’re on Mac or Linux, this will generate a default configuration file for you:
- On Mac, install via Homebrew:
/usr/local/etc/telegraf.conf - On Linux:
/etc/telegraf/telegraf.conf
Otherwise, you’ll need to create an empty configuration file or generate one with telegraf config > telegraf.conf. Once you have located or created your configuration file, all that’s left to do is connect Telegraf to your MQTT Broker and InfluxDB.
InfluxDB is very easy to configure a connection to, and you can add these lines to the config file:
[[outputs.influxdb_v2]]
urls = ["InfluxDB address & port"]
token = "admin token"
organization = "org name"
bucket = "destination database"
- The InfluxDB address and port should be wherever you have InfluxDB installed. If you’re running on a local network, this will be
http://127.0.0.1:8181; otherwise, it’ll be the IP and port. - Token is the admin token you copied from installation.
- Organization can be whatever you’d like to name it.
- Bucket should be the name of the database you’re writing all your MQTT data to. You don’t have to create the database first.
Setting up a connection to your MQTT broker is also straightforward:
[[inputs.mqtt_consumer]]
servers = ["broker address"]
topics = ["list of topics"]
data_format = "value"
data_type = "data_type"
## if you have username and password authentication for MQTT
username = "username"
password = "password"
- The broker address is one again the address and port for where your MQTT broker is running. For a local network, this will be
tcp://127.0.0.1:1883 - Topics is a comma-separated list of topics that you’re writing to.
- Data type is the primitive data type being written: integer, float, long, string, or boolean.
This is all you need in your configuration file to have the full pipeline running! If you run telegraf with telegraf --config telegraf.conf, you should be able to send a message from an MQTT publisher and view that data in InfluxDB.
However, you can make some improvements in Telegraf’s configuration to help parse and organize your data by topic. By default, this writes each topic into a single tag column to the same table, with a monolithic “value” column for all your values, which isn’t a very good data model. With topic parsing and pivot processing added to the configuration, we can specify what part of the topic should define what table the data is written into, turn every level of the topic into a tag, and pivot on the last level of the topic so that each raw value is its own field:
[[inputs.mqtt_consumer]]
servers = ["broker address"]
topics = ["/sensors/#"]
data_format = "value"
data_type = "data_type"
## if you have username and password authentication for MQTT
username = "username"
password = "password"
[[inputs.mqtt_consumer.topic_parsing]]
measurement = "/measurement/_/_/_/_"
tags = "/_/device_type/version/device_name/field"
[[processors.pivot]]
tag_key = "field"
value_key = "value"
This takes a value from the /sensors/vehicles/v1/device1/temp topic and writes it to the sensors table. The tag columns populate with device_type = vehicles, version = v1, device_name = device1, and temp is written as a field with the value of temp set to whatever your MQTT publisher wrote. You can modify this configuration as appropriate for your topics, and the documentation provides full information on everything that can be done.
Further improvements
With MQTT data being published, parsed, and written into InfluxDB, you’ve fully set up an MQTT data pipeline! However, there’s a lot more you can do:
- View and query your data with the InfluxDB Explorer UI, as discussed earlier.
- Connect any one of the many client libraries to access your data and use it for downstream applications, or to a data visualization tool for dashboarding and insight into what’s being written.
- Use the InfluxDB 3 processing engine for further transformations and processing of your data as it’s written.
- Set up alerts, monitoring, forecasting, and more with the processing engine, too.
The final product
By integrating MQTT, Telegraf, and InfluxDB, you’ve constructed a robust, fully-functioning data pipeline capable of efficiently centralizing real-time telemetry. The lightweight MQTT protocol ensures that messages from your distributed network flow reliably to the broker, while Telegraf acts as the collection agent for seamless ingestion and transformation. Finally, InfluxDB provides the purpose-built storage and specialized features needed to query and visualize your data in minimal time. This architecture establishes a solid foundation for turning raw event streams into meaningful insights, minimizing your time to awesome.