‹ Plugins / AMQP Subscriber
Scheduled

AMQP Subscriber

The AMQP Subscriber plugin brings real-time AMQP messages from RabbitMQ and similar brokers into InfluxDB 3 without custom consumers. It simplifies ingestion for IoT, event driven monitoring, and sensor pipelines, reducing pipeline fragility and getting operational data into InfluxDB faster with less engineering overhead.

Configuration

Plugin parameters may be specified as key-value pairs in the --trigger-arguments flag (CLI) or in the trigger_arguments field (API) when creating a trigger. This plugin supports TOML configuration files for complex mapping scenarios, which can be specified using the config_file_path parameter.

Plugin metadata

This plugin includes a JSON metadata schema in its docstring that defines supported trigger types and configuration parameters. This metadata enables the InfluxDB 3 Explorer UI to display and configure the plugin.

Required parameters

Parameter Type Default Description
host string required AMQP broker hostname or IP address
queues string required Space-separated list of queue names to consume messages from (e.g., “sensor_data alerts”)
table_name string required (json/text only) Static InfluxDB measurement name. Required for json/text formats if table_name_field is not set. Not required for lineprotocol.
table_name_field string none Field path to extract table name dynamically from message data. JSON: field name (auto-prefixed with $.). Text: regex pattern with capture group. Alternative to table_name.

Connection parameters

Parameter Type Default Description
port integer 5672 AMQP broker port (5672 for non-TLS, 5671 for TLS)
virtual_host string ”/” AMQP virtual host

Authentication parameters

Parameter Type Default Description
username string “guest” AMQP broker username
password string “guest” AMQP broker password

TLS/SSL parameters

Parameter (CLI) Parameter (TOML) Type Default Description
ssl_ca_cert [amqp.ssl] ca_cert string none Path to CA certificate file. Enables SSL when provided.
ssl_client_cert [amqp.ssl] client_cert string none Path to client certificate for mutual TLS
ssl_client_key [amqp.ssl] client_key string none Path to client private key for mutual TLS

Note: For mutual TLS, both ssl_client_cert and ssl_client_key must be provided together. SSL is automatically enabled when ssl_ca_cert is provided.

Message handling parameters

Parameter Type Default Description
ack_policy string “on_success” When to acknowledge messages: on_success or always
requeue_on_failure string “false” Whether to requeue messages that fail processing. When true, failed messages are returned to the queue for retry.
max_messages integer 500 Maximum number of messages to retrieve per scheduled call

Acknowledgement policies: - on_success - Acknowledge only after successful processing. Failed messages are rejected (requeue controlled by requeue_on_failure). - always - Acknowledge all messages at the end of processing, regardless of success or failure.

Message format parameters

Parameter Type Default Description
format string “json” Message format: json, lineprotocol, or text
timestamp_field string none Field containing timestamp (format depends on message format)

Format-specific timestamp_field syntax:

The timestamp field format differs between JSON and Text formats:

Format Syntax Split Method Example (CLI) Example (TOML)
JSON field_name:format Split by first : "timestamp:ms" "$.timestamp:ms"
Text regex:format Split by last : "ts:(\\d+):ms" "ts:(\\d+):ms"

Note: In CLI arguments, JSON paths are specified without $. prefix (added automatically). In TOML configuration, use full JSONPath syntax with $. prefix.

Note: Text format uses the last colon to split, allowing regex patterns to contain colons (e.g., time patterns).

Supported timestamp formats: - ns - nanoseconds (Unix timestamp) - ms - milliseconds (Unix timestamp) - s - seconds (Unix timestamp) - datetime - ISO 8601 string (e.g., “2021-12-01T12:00:00Z”)

JSON format parameters

Parameter Type Default Description
tags string none Space-separated tag names. Example: “room sensor location”
fields string required Space-separated field mappings. Format: “name:type=jsonpath” without $.

Field specification format: "temp:float=temperature hum:int=humidity status:bool=online"

Supported field types: int, uint, float, string, bool

Text format parameters

Parameter Type Default Description
tags string none Space-separated tag patterns. Format: “name=regex_pattern”
fields string required Space-separated field patterns. Format: “name:type=regex_pattern”

Tag specification format: "room=room:([^,\\s]+) sensor=sensor:(\\w+)"

Field specification format: "temp:float=temp:([\\d.]+) status:bool=(true|false)"

TOML configuration

Parameter Type Default Description
config_file_path string none Path to TOML config file (absolute or relative)

To use a TOML configuration file, specify the config_file_path in the trigger arguments.

File path resolution

All file paths in the plugin (configuration file, TLS certificates) follow the same resolution logic:

  • Absolute paths (e.g., /etc/amqp/config.toml) are used as-is
  • Relative paths (e.g., config.toml, certs/ca.crt) are resolved from PLUGIN_DIR environment variable

If a relative path is specified and PLUGIN_DIR is not set, the plugin will return an error.

Example TOML configuration

amqp_config_example.toml - comprehensive configuration example with all three message formats

Ready to get started?

Download InfluxDB 3 and have AMQP Subscriber running in minutes.