‹ Plugins / Kafka Subscriber
Scheduled

Kafka Subscriber

The Kafka Subscriber plugin brings real-time Kafka data into InfluxDB 3 without custom consumers or transformation services. It simplifies ingestion for IoT, observability, and streaming pipelines, making Kafka data immediately useful for real-time analysis with less infrastructure and engineering overhead.

Configuration

Plugin parameters may be specified as key-value pairs in the --trigger-arguments flag (CLI) or in the trigger_arguments field (API) when creating a trigger. This plugin supports TOML configuration files for complex mapping scenarios, which can be specified using the config_file_path parameter.

Plugin metadata

This plugin includes a JSON metadata schema in its docstring that defines supported trigger types and configuration parameters. This metadata enables the InfluxDB 3 Explorer UI to display and configure the plugin.

Required parameters

In TOML configuration, bootstrap_servers, topics, and group_id are placed under the [kafka] section; table_name and table_name_field are placed under [mapping.json] or [mapping.text] section.

Parameter Type Default Description
bootstrap_servers string required Space-separated list of Kafka broker addresses (e.g., “kafka1:9092 kafka2:9092”)
topics string required Space-separated list of topics (e.g., “sensor_data metrics”)
group_id string required Kafka consumer group ID (must be unique per consumer group)
table_name string required (json/text only) InfluxDB measurement name for storing data. Not required for lineprotocol format or when table_name_field is set.
table_name_field string none JSON field name or regex pattern to extract table name dynamically from each message. Alternative to static table_name.

Connection parameters

In TOML configuration, these parameters are placed under the [kafka] section.

Parameter Type Default Description
auto_offset_reset string “earliest” Where to start consuming on first connect: “earliest” or “latest”
max_poll_records int 500 Maximum messages per scheduled call. Set to 0 for unlimited.

Offset Commit Policy

In TOML configuration, this parameter is placed under the [kafka] section.

Parameter Type Default Description
offset_commit_policy string “on_success” When to commit offsets: “on_success” or “always”

Policy behavior:

Policy Behavior
on_success Commit offsets only after ALL messages in the batch are successfully processed and written
always Commit offsets immediately after receiving messages, regardless of processing success

When to use each policy:

  • on_success (recommended): Use when data integrity is important. Failed messages will be reprocessed on the next trigger execution.
  • always: Use in high-throughput scenarios where occasional data loss is acceptable, or when you have external error handling (failed messages are logged to kafka_exceptions table).

Security parameters

In TOML configuration, this parameter is placed under the [kafka] section.

Parameter Type Default Description
security_protocol string “PLAINTEXT” Security protocol: “PLAINTEXT”, “SSL”, “SASL_PLAINTEXT”, “SASL_SSL”

SASL Authentication parameters

In TOML configuration, these parameters are placed under the [kafka.sasl] section with shortened names.

Parameter (CLI) Parameter (TOML) Type Default Description
sasl_mechanism [kafka.sasl] mechanism string none SASL mechanism: “PLAIN”, “SCRAM-SHA-256”, “SCRAM-SHA-512”
sasl_username [kafka.sasl] username string none SASL username (required with sasl_mechanism)
sasl_password [kafka.sasl] password string none SASL password (required with sasl_mechanism)

Note: All three SASL parameters must be provided together when using SASL authentication.

SSL/TLS parameters

In TOML configuration, these parameters are placed under the [kafka.ssl] section with shortened names.

Parameter (CLI) Parameter (TOML) Type Default Description
ssl_ca_cert [kafka.ssl] ca_cert string none Path to CA certificate file
ssl_cert [kafka.ssl] client_cert string none Path to client certificate for mutual TLS
ssl_key [kafka.ssl] client_key string none Path to client private key for mutual TLS
ssl_key_password [kafka.ssl] key_password string none Password for encrypted client private key

Note: For mutual TLS, both ssl_cert and ssl_key must be provided together.

Message format parameters

In TOML configuration, format is placed under the [kafka] section; timestamp_field is placed under [mapping.json] or [mapping.text] section.

Parameter Type Default Description
format string “json” Message format: json, lineprotocol, or text
timestamp_field string none Field containing timestamp (format depends on message format)

Supported formats:

Format Description
json JSON with JSONPath field mapping
lineprotocol InfluxDB Line Protocol passthrough
text Plain text with regex-based parsing

Format-specific timestamp_field syntax:

Format Syntax Split Method Example (CLI) Example (TOML)
JSON field_name:format Split by first : "timestamp:ms" "$.timestamp:ms"
Text regex:format Split by last : "ts:(\\d+):ms" "ts:(\\d+):ms"

Supported timestamp formats: - ns - nanoseconds (Unix timestamp) - ms - milliseconds (Unix timestamp) - s - seconds (Unix timestamp) - datetime - ISO 8601 string (e.g., “2021-12-01T12:00:00Z”)

JSON format parameters

In TOML configuration, tags are placed under [mapping.json.tags] section and fields under [mapping.json.fields] section.

Parameter Type Default Description
tags string none Space-separated tag names. Example: “room sensor location”
fields string required Space-separated field mappings. Format: “name:type=jsonpath” without $.

Field specification format: "temp:float=temperature hum:int=humidity status:bool=online"

Supported field types: int, uint, float, string, bool

Text format parameters

In TOML configuration, tags are placed under [mapping.text.tags] section and fields under [mapping.text.fields] section.

Parameter Type Default Description
tags string none Space-separated tag patterns. Format: “name=regex_pattern”
fields string required Space-separated field patterns. Format: “name:type=regex_pattern”

TOML configuration

Parameter Type Default Description
config_file_path string none Path to TOML config file (absolute or relative)

File path resolution

All file paths in the plugin (configuration file, TLS certificates) follow the same resolution logic:

  • Absolute paths (e.g., /etc/kafka/config.toml) are used as-is
  • Relative paths (e.g., config.toml, certs/ca.crt) are resolved from PLUGIN_DIR environment variable

If a relative path is specified and PLUGIN_DIR is not set, the plugin will return an error.

Example TOML configuration

kafka_config_example.toml - comprehensive configuration example with all formats and security options

Ready to get started?

Download InfluxDB 3 and have Kafka Subscriber running in minutes.