Kafka Subscriber
The Kafka Subscriber plugin brings real-time Kafka data into InfluxDB 3 without custom consumers or transformation services. It simplifies ingestion for IoT, observability, and streaming pipelines, making Kafka data immediately useful for real-time analysis with less infrastructure and engineering overhead.
Configuration
Plugin parameters may be specified as key-value pairs in the --trigger-arguments flag (CLI) or in the trigger_arguments field (API) when creating a trigger. This plugin supports TOML configuration files for complex mapping scenarios, which can be specified using the config_file_path parameter.
Plugin metadata
This plugin includes a JSON metadata schema in its docstring that defines supported trigger types and configuration parameters. This metadata enables the InfluxDB 3 Explorer UI to display and configure the plugin.
Required parameters
In TOML configuration, bootstrap_servers, topics, and group_id are placed under the [kafka] section; table_name and table_name_field are placed under [mapping.json] or [mapping.text] section.
| Parameter | Type | Default | Description |
|---|---|---|---|
bootstrap_servers |
string | required | Space-separated list of Kafka broker addresses (e.g., “kafka1:9092 kafka2:9092”) |
topics |
string | required | Space-separated list of topics (e.g., “sensor_data metrics”) |
group_id |
string | required | Kafka consumer group ID (must be unique per consumer group) |
table_name |
string | required (json/text only) | InfluxDB measurement name for storing data. Not required for lineprotocol format or when table_name_field is set. |
table_name_field |
string | none | JSON field name or regex pattern to extract table name dynamically from each message. Alternative to static table_name. |
Connection parameters
In TOML configuration, these parameters are placed under the [kafka] section.
| Parameter | Type | Default | Description |
|---|---|---|---|
auto_offset_reset |
string | “earliest” | Where to start consuming on first connect: “earliest” or “latest” |
max_poll_records |
int | 500 | Maximum messages per scheduled call. Set to 0 for unlimited. |
Offset Commit Policy
In TOML configuration, this parameter is placed under the [kafka] section.
| Parameter | Type | Default | Description |
|---|---|---|---|
offset_commit_policy |
string | “on_success” | When to commit offsets: “on_success” or “always” |
Policy behavior:
| Policy | Behavior |
|---|---|
on_success |
Commit offsets only after ALL messages in the batch are successfully processed and written |
always |
Commit offsets immediately after receiving messages, regardless of processing success |
When to use each policy:
- on_success (recommended): Use when data integrity is important. Failed messages will be reprocessed on the next trigger execution.
- always: Use in high-throughput scenarios where occasional data loss is acceptable, or when you have external error handling (failed messages are logged to
kafka_exceptionstable).
Security parameters
In TOML configuration, this parameter is placed under the [kafka] section.
| Parameter | Type | Default | Description |
|---|---|---|---|
security_protocol |
string | “PLAINTEXT” | Security protocol: “PLAINTEXT”, “SSL”, “SASL_PLAINTEXT”, “SASL_SSL” |
SASL Authentication parameters
In TOML configuration, these parameters are placed under the [kafka.sasl] section with shortened names.
| Parameter (CLI) | Parameter (TOML) | Type | Default | Description |
|---|---|---|---|---|
sasl_mechanism |
[kafka.sasl] mechanism |
string | none | SASL mechanism: “PLAIN”, “SCRAM-SHA-256”, “SCRAM-SHA-512” |
sasl_username |
[kafka.sasl] username |
string | none | SASL username (required with sasl_mechanism) |
sasl_password |
[kafka.sasl] password |
string | none | SASL password (required with sasl_mechanism) |
Note: All three SASL parameters must be provided together when using SASL authentication.
SSL/TLS parameters
In TOML configuration, these parameters are placed under the [kafka.ssl] section with shortened names.
| Parameter (CLI) | Parameter (TOML) | Type | Default | Description |
|---|---|---|---|---|
ssl_ca_cert |
[kafka.ssl] ca_cert |
string | none | Path to CA certificate file |
ssl_cert |
[kafka.ssl] client_cert |
string | none | Path to client certificate for mutual TLS |
ssl_key |
[kafka.ssl] client_key |
string | none | Path to client private key for mutual TLS |
ssl_key_password |
[kafka.ssl] key_password |
string | none | Password for encrypted client private key |
Note: For mutual TLS, both ssl_cert and ssl_key must be provided together.
Message format parameters
In TOML configuration, format is placed under the [kafka] section; timestamp_field is placed under [mapping.json] or [mapping.text] section.
| Parameter | Type | Default | Description |
|---|---|---|---|
format |
string | “json” | Message format: json, lineprotocol, or text |
timestamp_field |
string | none | Field containing timestamp (format depends on message format) |
Supported formats:
| Format | Description |
|---|---|
json |
JSON with JSONPath field mapping |
lineprotocol |
InfluxDB Line Protocol passthrough |
text |
Plain text with regex-based parsing |
Format-specific timestamp_field syntax:
| Format | Syntax | Split Method | Example (CLI) | Example (TOML) |
|---|---|---|---|---|
| JSON | field_name:format |
Split by first : |
"timestamp:ms" |
"$.timestamp:ms" |
| Text | regex:format |
Split by last : |
"ts:(\\d+):ms" |
"ts:(\\d+):ms" |
Supported timestamp formats:
- ns - nanoseconds (Unix timestamp)
- ms - milliseconds (Unix timestamp)
- s - seconds (Unix timestamp)
- datetime - ISO 8601 string (e.g., “2021-12-01T12:00:00Z”)
JSON format parameters
In TOML configuration, tags are placed under [mapping.json.tags] section and fields under [mapping.json.fields] section.
| Parameter | Type | Default | Description |
|---|---|---|---|
tags |
string | none | Space-separated tag names. Example: “room sensor location” |
fields |
string | required | Space-separated field mappings. Format: “name:type=jsonpath” without $. |
Field specification format: "temp:float=temperature hum:int=humidity status:bool=online"
Supported field types: int, uint, float, string, bool
Text format parameters
In TOML configuration, tags are placed under [mapping.text.tags] section and fields under [mapping.text.fields] section.
| Parameter | Type | Default | Description |
|---|---|---|---|
tags |
string | none | Space-separated tag patterns. Format: “name=regex_pattern” |
fields |
string | required | Space-separated field patterns. Format: “name:type=regex_pattern” |
TOML configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
config_file_path |
string | none | Path to TOML config file (absolute or relative) |
File path resolution
All file paths in the plugin (configuration file, TLS certificates) follow the same resolution logic:
- Absolute paths (e.g.,
/etc/kafka/config.toml) are used as-is - Relative paths (e.g.,
config.toml,certs/ca.crt) are resolved fromPLUGIN_DIRenvironment variable
If a relative path is specified and PLUGIN_DIR is not set, the plugin will return an error.
Example TOML configuration
kafka_config_example.toml - comprehensive configuration example with all formats and security options
Ready to get started?
Download InfluxDB 3 and have running in minutes.