AMQP Subscriber
The AMQP Subscriber Plugin enables real-time ingestion of AMQP messages (RabbitMQ and other AMQP-compatible brokers) into InfluxDB 3. Subscribe to queues and automatically transform messages into time-series data with support for JSON, Line Protocol, and custom text formats. The plugin provides flexible message acknowledgement policies and comprehensive error tracking with statistics.
Software Requirements
- InfluxDB 3 Core/Enterprise: with the Processing Engine enabled
- Python packages:
pika(AMQP client library)jsonpath-ng(JSON path parsing for JSON format)
Installation steps
-
Start InfluxDB 3 with the Processing Engine enabled (
--plugin-dir /path/to/plugins):influxdb3 serve \ --node-id node0 \ --object-store file \ --data-dir ~/.influxdb3 \ --plugin-dir ~/.plugins -
Install required Python packages:
influxdb3 install package pika influxdb3 install package jsonpath-ng
Configuration
Plugin parameters may be specified as key-value pairs in the --trigger-arguments flag (CLI) or in the trigger_arguments field (API) when creating a trigger. This plugin supports TOML configuration files for complex mapping scenarios, which can be specified using the config_file_path parameter.
Plugin metadata
This plugin includes a JSON metadata schema in its docstring that defines supported trigger types and configuration parameters. This metadata enables the InfluxDB 3 Explorer UI to display and configure the plugin.
Required parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
host |
string | required | AMQP broker hostname or IP address |
queues |
string | required | Space-separated list of queue names to consume messages from (e.g., “sensor_data alerts”) |
table_name |
string | required (json/text only) | Static InfluxDB measurement name. Required for json/text formats if table_name_field is not set. Not required for lineprotocol. |
table_name_field |
string | none | Field path to extract table name dynamically from message data. JSON: field name (auto-prefixed with $.). Text: regex pattern with capture group. Alternative to table_name. |
Connection parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
port |
integer | 5672 | AMQP broker port (5672 for non-TLS, 5671 for TLS) |
virtual_host |
string | ”/” | AMQP virtual host |
Authentication parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
username |
string | “guest” | AMQP broker username |
password |
string | “guest” | AMQP broker password |
TLS/SSL parameters
| Parameter (CLI) | Parameter (TOML) | Type | Default | Description |
|---|---|---|---|---|
ssl_ca_cert |
[amqp.ssl] ca_cert |
string | none | Path to CA certificate file. Enables SSL when provided. |
ssl_client_cert |
[amqp.ssl] client_cert |
string | none | Path to client certificate for mutual TLS |
ssl_client_key |
[amqp.ssl] client_key |
string | none | Path to client private key for mutual TLS |
Note: For mutual TLS, both ssl_client_cert and ssl_client_key must be provided together. SSL is automatically enabled when ssl_ca_cert is provided.
Message handling parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
ack_policy |
string | “on_success” | When to acknowledge messages: on_success or always |
requeue_on_failure |
string | “false” | Whether to requeue messages that fail processing. When true, failed messages are returned to the queue for retry. |
max_messages |
integer | 500 | Maximum number of messages to retrieve per scheduled call |
Acknowledgement policies:
- on_success - Acknowledge only after successful processing. Failed messages are rejected (requeue controlled by requeue_on_failure).
- always - Acknowledge all messages at the end of processing, regardless of success or failure.
Message format parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
format |
string | “json” | Message format: json, lineprotocol, or text |
timestamp_field |
string | none | Field containing timestamp (format depends on message format) |
Format-specific timestamp_field syntax:
The timestamp field format differs between JSON and Text formats:
| Format | Syntax | Split Method | Example (CLI) | Example (TOML) |
|---|---|---|---|---|
| JSON | field_name:format |
Split by first : |
"timestamp:ms" |
"$.timestamp:ms" |
| Text | regex:format |
Split by last : |
"ts:(\\d+):ms" |
"ts:(\\d+):ms" |
Note: In CLI arguments, JSON paths are specified without $. prefix (added automatically). In TOML configuration, use full JSONPath syntax with $. prefix.
Note: Text format uses the last colon to split, allowing regex patterns to contain colons (e.g., time patterns).
Supported timestamp formats:
- ns - nanoseconds (Unix timestamp)
- ms - milliseconds (Unix timestamp)
- s - seconds (Unix timestamp)
- datetime - ISO 8601 string (e.g., “2021-12-01T12:00:00Z”)
JSON format parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
tags |
string | none | Space-separated tag names. Example: “room sensor location” |
fields |
string | required | Space-separated field mappings. Format: “name:type=jsonpath” without $. |
Field specification format: "temp:float=temperature hum:int=humidity status:bool=online"
Supported field types: int, uint, float, string, bool
Text format parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
tags |
string | none | Space-separated tag patterns. Format: “name=regex_pattern” |
fields |
string | required | Space-separated field patterns. Format: “name:type=regex_pattern” |
Tag specification format: "room=room:([^,\\s]+) sensor=sensor:(\\w+)"
Field specification format: "temp:float=temp:([\\d.]+) status:bool=(true|false)"
TOML configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
config_file_path |
string | none | Path to TOML config file (absolute or relative) |
To use a TOML configuration file, specify the config_file_path in the trigger arguments.
File path resolution
All file paths in the plugin (configuration file, TLS certificates) follow the same resolution logic:
- Absolute paths (e.g.,
/etc/amqp/config.toml) are used as-is - Relative paths (e.g.,
config.toml,certs/ca.crt) are resolved fromPLUGIN_DIRenvironment variable
If a relative path is specified and PLUGIN_DIR is not set, the plugin will return an error.
Example TOML configuration
amqp_config_example.toml - comprehensive configuration example with all three message formats
Quick Start
Scheduled ingestion with TOML configuration
Recommended for production use with complex mappings:
# 1. Set PLUGIN_DIR environment variable
export PLUGIN_DIR=~/.plugins
# 2. Copy and edit configuration file
cp amqp_config_example.toml $PLUGIN_DIR/my_amqp_config.toml
# Edit my_amqp_config.toml with your broker and mapping settings
# 3. Create the trigger
influxdb3 create trigger \
--database mydb \
--plugin-filename gh:influxdata/amqp_subscriber/amqp_subscriber.py \
--trigger-spec "every:10s" \
--trigger-arguments config_file_path=my_amqp_config.toml \
amqp_ingestion
Scheduled ingestion with command-line arguments
For simple JSON message ingestion:
influxdb3 create trigger \
--database mydb \
--plugin-filename gh:influxdata/amqp_subscriber/amqp_subscriber.py \
--trigger-spec "every:5s" \
--trigger-arguments 'host=localhost,queues=sensor_data alerts,format=json,table_name=sensor_data,fields=temp:float=temperature hum:int=humidity,tags=location sensor_id' \
amqp_sensors
Secure AMQP connection with TLS
influxdb3 create trigger \
--database mydb \
--plugin-filename gh:influxdata/amqp_subscriber/amqp_subscriber.py \
--trigger-spec "every:10s" \
--trigger-arguments 'host=secure-broker.example.com,port=5671,queues=secure_data,format=json,table_name=secure_data,ssl_ca_cert=certs/ca.crt,username=myuser,password=mypass,fields=value:float=value' \
secure_amqp
Troubleshooting
Check Plugin Logs
influxdb3 query --database _internal \
"SELECT * FROM system.processing_engine_logs
WHERE trigger_name = 'amqp_ingestion'
ORDER BY time DESC LIMIT 20"
Common Issues
“pika library not installed”
influxdb3 install package pika
“Configuration file not found”
- For relative paths, ensure
PLUGIN_DIRenvironment variable is set - For absolute paths, verify the file exists at the specified location
# For relative paths
export PLUGIN_DIR=~/.plugins
ls $PLUGIN_DIR/my_amqp_config.toml
# Or use absolute path
ls /etc/amqp/my_amqp_config.toml
“Failed to connect to AMQP broker”
- Verify broker address and port
- Check network connectivity
- For TLS connections, verify certificate paths
- Verify authentication credentials
“Both ssl_client_cert and ssl_client_key must be provided for mutual TLS”
For mutual TLS authentication, both client certificate and key are required.
“No fields were mapped from JSON data”
- Verify JSONPath expressions in field mappings (use
$.prefix) - Check that JSON structure matches your paths
- Review
amqp_exceptionstable for detailed errors
Messages not being processed
- Check trigger status:
influxdb3 show summary --database mydb - Verify AMQP connection in plugin logs
- Ensure the queue exists and has messages
- Check that the queue name is correct
Failed messages are being discarded
By default, failed messages are rejected without requeue (requeue_on_failure=false):
- Set requeue_on_failure=true to return failed messages to the queue for retry
- Use ack_policy=always to acknowledge all messages regardless of errors
- Check amqp_exceptions table to diagnose parsing issues
- Configure a dead-letter queue in RabbitMQ to capture rejected messages for analysis
Data requirements
The plugin automatically creates the target measurement table on first write. Field mappings are required for JSON and Text formats to specify which fields to extract and their data types.
Message encoding requirements
- UTF-8 text only: The plugin only processes UTF-8 encoded text messages. Binary messages are automatically skipped with a warning logged.
- Non-empty payloads: Empty or whitespace-only messages are automatically skipped.
Message Formats
JSON Format
The primary use case for structured IoT data. Supports nested fields using JSONPath expressions.
TOML Configuration
[amqp]
host = "localhost"
port = 5672
queues = ["sensor_data", "alerts"]
format = "json"
[mapping.json]
table_name = "sensor_data"
timestamp_field = "$.timestamp:ms"
[mapping.json.tags]
location = "$.location"
sensor_id = "$.sensor.id"
[mapping.json.fields]
temperature = ["$.temp", "float"]
humidity = ["$.humidity", "int"]
status = ["$.online", "bool"]
Example Message
{
"timestamp": 1638360000000,
"location": "warehouse_a",
"sensor": {
"id": "sensor_001"
},
"temp": 22.5,
"humidity": 65,
"online": true
}
Resulting Data
sensor_data,location=warehouse_a,sensor_id=sensor_001 temperature=22.5,humidity=65i,status=true 1638360000000000000
JSON Array Support
Process batch messages containing arrays of JSON objects:
[
{"timestamp": 1638360000000, "sensor_id": "001", "temperature": 22.5},
{"timestamp": 1638360001000, "sensor_id": "002", "temperature": 23.1},
{"timestamp": 1638360002000, "sensor_id": "003", "temperature": 21.8}
]
Array processing behavior:
- Each array element is processed independently as a separate data point
- If one element fails to parse, the others continue processing (partial success)
- Parse errors for individual elements are logged to amqp_exceptions table
- Statistics count each AMQP message as one unit (received=1, processed=1), regardless of array size
Line Protocol Format
For messages already in InfluxDB line protocol format, use passthrough mode. No mapping configuration needed - messages are validated and written directly.
TOML Configuration
[amqp]
host = "rabbitmq.example.com"
queues = ["influxdb_metrics"]
format = "lineprotocol"
CLI Configuration
influxdb3 create trigger \
--database mydb \
--plugin-filename amqp_subscriber.py \
--trigger-spec "every:10s" \
--trigger-arguments 'host=rabbitmq.example.com,queues=influxdb_metrics,format=lineprotocol' \
amqp_lineprotocol
Example Message
sensor_data,location=warehouse_a,sensor_id=001 temperature=22.5,humidity=65i 1638360000000000000
Supported Line Protocol Types
| Type | Suffix | Example |
|---|---|---|
| Float | none | temperature=22.5 |
| Integer | i |
count=100i |
| Unsigned Integer | u |
bytes=1024u |
| String | "..." |
status="running" |
| Boolean | true/false |
active=true |
Text Format
Parse plain text messages using regular expressions:
TOML Configuration
[amqp]
format = "text"
[mapping.text]
table_name = "sensor_logs"
timestamp_field = "ts:(\\d+):ms"
[mapping.text.tags]
location = "location=([^,\\s]+)"
[mapping.text.fields]
temperature = ["temp:([\\d.]+)", "float"]
humidity = ["hum:(\\d+)", "int"]
status = ["status:(true|false)", "bool"]
Example Message
location=warehouse_a,temp:22.5,hum:65,status:true,ts:1638360000000
Statistics and Monitoring
The plugin tracks comprehensive statistics and writes them to the amqp_stats table every 10 plugin calls.
Important notes: - Statistics are written every 10 plugin invocations, not based on time intervals - Each queue is tracked separately with independent statistics - Statistics persist across plugin restarts using the InfluxDB cache
amqp_stats Table
| Field | Type | Description |
|---|---|---|
queue (tag) |
tag | AMQP queue name |
host (tag) |
tag | AMQP broker address |
virtual_host (tag) |
tag | AMQP virtual host |
received |
int | Messages received in current period |
processed |
int | Messages processed in current period |
failed |
int | Messages failed in current period |
success_rate |
float | Success rate for current period (%) |
total_received |
int | Total messages received (all time) |
total_processed |
int | Total messages processed (all time) |
total_failed |
int | Total messages failed (all time) |
total_success_rate |
float | Total success rate (all time, %) |
Querying Statistics
# Get latest statistics
influxdb3 query --database mydb \
"SELECT * FROM amqp_stats ORDER BY time DESC LIMIT 10"
# Success rate over time
influxdb3 query --database mydb \
"SELECT queue, success_rate, processed, failed
FROM amqp_stats
WHERE time > now() - INTERVAL '1 hour'
ORDER BY time DESC"
Error Handling
Parse errors and message processing failures are logged to the amqp_exceptions table:
amqp_exceptions Table
| Field | Type | Description |
|---|---|---|
queue (tag) |
tag | AMQP queue where error occurred |
error_type (tag) |
tag | Type of error (e.g., JSONDecodeError) |
error_message |
string | Detailed error message |
raw_message |
string | Original AMQP message (truncated to 1KB) |
Checking for Errors
influxdb3 query --database mydb \
"SELECT * FROM amqp_exceptions ORDER BY time DESC LIMIT 10"
Architecture
How It Works
- Scheduled Trigger: Plugin runs on schedule (e.g.,
every:10s) - Configuration Caching: Plugin configuration is parsed once and cached between executions
- Message Retrieval: Plugin connects, retrieves up to
max_messagesfrom queue usingbasic_get - Parse & Write: Messages parsed according to format and written to InfluxDB
- Acknowledgement: Messages acknowledged based on
ack_policysetting - Error Tracking: Parse errors logged to
amqp_exceptionstable - Statistics: Written to
amqp_statstable every 10 plugin calls - Disconnect: Connection closed after processing
Performance Optimization
The plugin includes several optimizations for high-throughput scenarios:
- Configuration Caching: Plugin configuration is parsed once and reused across all trigger executions
- Pre-compiled Patterns: JSONPath expressions and regex patterns are compiled once during parser initialization, not per-message
- Batch Retrieval: Multiple messages retrieved in a single connection session
- QoS Control: Prefetch count limits memory usage on consumer side
Ready to get started?
Download InfluxDB 3 and have running in minutes.