AMQP Subscriber
The AMQP Subscriber plugin brings real-time AMQP messages from RabbitMQ and similar brokers into InfluxDB 3 without custom consumers. It simplifies ingestion for IoT, event driven monitoring, and sensor pipelines, reducing pipeline fragility and getting operational data into InfluxDB faster with less engineering overhead.
Configuration
Plugin parameters may be specified as key-value pairs in the --trigger-arguments flag (CLI) or in the trigger_arguments field (API) when creating a trigger. This plugin supports TOML configuration files for complex mapping scenarios, which can be specified using the config_file_path parameter.
Plugin metadata
This plugin includes a JSON metadata schema in its docstring that defines supported trigger types and configuration parameters. This metadata enables the InfluxDB 3 Explorer UI to display and configure the plugin.
Required parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
string | required | AMQP broker hostname or IP address |
queues |
string | required | Space-separated list of queue names to consume messages from (e.g., “sensor_data alerts”) |
table_name |
string | required (json/text only) | Static InfluxDB measurement name. Required for json/text formats if table_name_field is not set. Not required for lineprotocol. |
table_name_field |
string | none | Field path to extract table name dynamically from message data. JSON: field name (auto-prefixed with $.). Text: regex pattern with capture group. Alternative to table_name. |
Connection parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
port |
integer | 5672 | AMQP broker port (5672 for non-TLS, 5671 for TLS) |
virtual_host |
string | ”/” | AMQP virtual host |
Authentication parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
username |
string | “guest” | AMQP broker username |
password |
string | “guest” | AMQP broker password |
TLS/SSL parameters
| Parameter (CLI) | Parameter (TOML) | Type | Default | Description |
|---|---|---|---|---|
ssl_ca_cert |
[amqp.ssl] ca_cert |
string | none | Path to CA certificate file. Enables SSL when provided. |
ssl_client_cert |
[amqp.ssl] client_cert |
string | none | Path to client certificate for mutual TLS |
ssl_client_key |
[amqp.ssl] client_key |
string | none | Path to client private key for mutual TLS |
Note: For mutual TLS, both ssl_client_cert and ssl_client_key must be provided together. SSL is automatically enabled when ssl_ca_cert is provided.
Message handling parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
ack_policy |
string | “on_success” | When to acknowledge messages: on_success or always |
requeue_on_failure |
string | “false” | Whether to requeue messages that fail processing. When true, failed messages are returned to the queue for retry. |
max_messages |
integer | 500 | Maximum number of messages to retrieve per scheduled call |
Acknowledgement policies:
- on_success - Acknowledge only after successful processing. Failed messages are rejected (requeue controlled by requeue_on_failure).
- always - Acknowledge all messages at the end of processing, regardless of success or failure.
Message format parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
format |
string | “json” | Message format: json, lineprotocol, or text |
timestamp_field |
string | none | Field containing timestamp (format depends on message format) |
Format-specific timestamp_field syntax:
The timestamp field format differs between JSON and Text formats:
| Format | Syntax | Split Method | Example (CLI) | Example (TOML) |
|---|---|---|---|---|
| JSON | field_name:format |
Split by first : |
"timestamp:ms" |
"$.timestamp:ms" |
| Text | regex:format |
Split by last : |
"ts:(\\d+):ms" |
"ts:(\\d+):ms" |
Note: In CLI arguments, JSON paths are specified without $. prefix (added automatically). In TOML configuration, use full JSONPath syntax with $. prefix.
Note: Text format uses the last colon to split, allowing regex patterns to contain colons (e.g., time patterns).
Supported timestamp formats:
- ns - nanoseconds (Unix timestamp)
- ms - milliseconds (Unix timestamp)
- s - seconds (Unix timestamp)
- datetime - ISO 8601 string (e.g., “2021-12-01T12:00:00Z”)
JSON format parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
tags |
string | none | Space-separated tag names. Example: “room sensor location” |
fields |
string | required | Space-separated field mappings. Format: “name:type=jsonpath” without $. |
Field specification format: "temp:float=temperature hum:int=humidity status:bool=online"
Supported field types: int, uint, float, string, bool
Text format parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
tags |
string | none | Space-separated tag patterns. Format: “name=regex_pattern” |
fields |
string | required | Space-separated field patterns. Format: “name:type=regex_pattern” |
Tag specification format: "room=room:([^,\\s]+) sensor=sensor:(\\w+)"
Field specification format: "temp:float=temp:([\\d.]+) status:bool=(true|false)"
TOML configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
config_file_path |
string | none | Path to TOML config file (absolute or relative) |
To use a TOML configuration file, specify the config_file_path in the trigger arguments.
File path resolution
All file paths in the plugin (configuration file, TLS certificates) follow the same resolution logic:
- Absolute paths (e.g.,
/etc/amqp/config.toml) are used as-is - Relative paths (e.g.,
config.toml,certs/ca.crt) are resolved fromPLUGIN_DIRenvironment variable
If a relative path is specified and PLUGIN_DIR is not set, the plugin will return an error.
Example TOML configuration
amqp_config_example.toml - comprehensive configuration example with all three message formats
Ready to get started?
Download InfluxDB 3 and have running in minutes.