Schema Validator
An InfluxDB 3 Processing Engine plugin that validates incoming line protocol data against a user-defined JSON schema. Only data that conforms to the schema is written to a target database or table, enabling a clean data pipeline pattern.
Software Requirements
1. Deploy the plugin files
The plugin code can be deployed via the InfluxDB CLI, Explorer, or GitHub (gh:) trigger paths. However, the schema JSON configuration file must be manually placed in the plugin directory on the server since there is no API for uploading non-plugin files.
schema_validator.py- the plugin code (can be uploaded via CLI/Explorer/GitHub)schema_validator_config.json- your schema definition (must be manually copied to the plugin directory)
2. Create a trigger
Cross-database validation (raw_db -> clean_db):
influxdb3 create trigger \
--database raw_db \
--plugin-filename schema_validator.py \
--trigger-spec \"all_tables\" \
--trigger-arguments schema_file=schema_validator_config.json,target_database=clean_db \
schema_validator_trigger
Same database, different table (with suffix):
influxdb3 create trigger \
--database mydb \
--plugin-filename schema_validator.py \
--trigger-spec \"table:weather\" \
--trigger-arguments schema_file=schema_validator_config.json,target_table_suffix=_clean \
schema_validator_weather
Using a TOML config file:
influxdb3 create trigger \
--database raw_db \
--plugin-filename schema_validator.py \
--trigger-spec \"all_tables\" \
--trigger-arguments config_file_path=schema_validator_trigger_config.toml \
schema_validator_trigger
3. Write data normally
Write to your raw database as usual. The plugin will automatically validate and forward conforming data.
# This row has all required fields -> will be written to clean_db
influxdb3 write --database raw_db \
\"weather,location=us-east,station_id=ST001 temperature=72.5,humidity=45.2\"
# This row is missing required tag 'station_id' -> will be rejected
influxdb3 write --database raw_db \
\"weather,location=us-east temperature=72.5,humidity=45.2\"
Configuration
The schema is defined in a JSON file. Here is the full structure:
{
"allowed_measurements": ["weather", "cpu"],
"tables": {
"weather": {
"target_table": "weather_clean",
"tags": {
"location": {
"required": true,
"allowed_values": ["us-east", "us-west", "eu-west"]
},
"station_id": {
"required": true
},
"region": {
"required": false
}
},
"fields": {
"temperature": {
"required": true,
"type": "float"
},
"humidity": {
"required": true,
"type": "float"
},
"condition": {
"required": false,
"type": "string",
"allowed_values": ["sunny", "cloudy", "rain", "snow"]
}
}
}
}
}
Quick Start
| Argument | Required | Default | Description |
|---|---|---|---|
schema_file |
Yes | - | Path to the JSON schema file (relative to PLUGIN_DIR). |
target_database |
No | (same db) | Database to write validated data to. |
target_table_prefix |
No | "" |
Prefix added to measurement names in the target. |
target_table_suffix |
No | "" |
Suffix added to measurement names in the target. |
log_rejected |
No | "true" |
Log info about rejected rows. |
log_accepted |
No | "false" |
Log info about accepted rows. |
write_rejection_log |
No | "false" |
Write rejection details to _schema_rejections measurement. |
config_file_path |
No | - | Path to TOML config file to override these arguments. |
Examples
IoT Sensor Validation
Ensure sensor readings always have a device_id, valid sensor type, and a numeric value:
{
"allowed_measurements": ["sensor_readings"],
"tables": {
"sensor_readings": {
"target_table": "sensors_validated",
"tags": {
"device_id": { "required": true },
"sensor_type": {
"required": true,
"allowed_values": ["temperature", "pressure", "humidity"]
}
},
"fields": {
"value": { "required": true, "type": "float" },
"status": {
"required": false,
"type": "string",
"allowed_values": ["ok", "warning", "critical"]
}
}
}
}
}
Multi-table with Cross-database
Validate weather and cpu data from raw_db, writing clean data to clean_db:
influxdb3 create trigger \
--database raw_db \
--plugin-filename schema_validator.py \
--trigger-spec \"all_tables\" \
--trigger-arguments schema_file=schema_validator_config.json,target_database=clean_db,write_rejection_log=true \
schema_validator_all
Monitoring Rejections
Query the rejection log to see what data is being rejected and why:
SELECT * FROM _schema_rejections
WHERE time > now() - INTERVAL '1 hour'
ORDER BY time DESC
Use Case
You have data coming into a “raw” database (e.g., raw_db) from various sources. You want to ensure only properly-structured, validated data makes it into your “clean” database (e.g., clean_db). This plugin sits on the WAL flush trigger and validates every incoming row against your schema definition before writing it to the target.
Common patterns:
- raw_db -> validate -> clean_db (cross-database)
- raw_table -> validate -> validated_table (same database, different table)
- source_table -> validate -> source_table_clean (same database, with suffix)
This is a single-file plugin (schema_validator.py) and can be loaded from GitHub via gh: trigger paths or created in InfluxDB Explorer.
Features
- Measurement validation: Define a whitelist of allowed measurement/table names
- Tag validation: Required/optional tags, allowed tag values
- Field validation: Required/optional fields, type checking (float, integer, string, boolean, uint64), allowed field values
- Field stripping: Extra tags/fields not defined in the schema are automatically stripped from the output
- Flexible targeting: Write to a different database, different table name, or add prefix/suffix
- Per-table schemas: Define different validation rules for each measurement
- Rejection logging: Optionally log rejected rows and/or write rejection details to a measurement
- Cached config: Schema file is cached for 5 minutes to avoid repeated file reads
Schema Fields Reference
Top-level
| Field | Type | Description |
|---|---|---|
allowed_measurements |
list[str] (optional) |
Whitelist of allowed measurement/table names. If omitted or empty ([]), no measurement filter is applied — all measurements fall through to the tables rules. |
tables |
dict (required) |
Map of measurement name -> table schema definition. Must contain at least one entry. Only measurements with an entry here will be validated and written to the target. |
Table Schema
| Field | Type | Description |
|---|---|---|
target_table |
str (optional) |
Override the target measurement name. Takes precedence over prefix/suffix args. |
tags |
dict |
Map of tag name -> tag definition. |
fields |
dict |
Map of field name -> field definition. |
Tag Definition
| Field | Type | Description |
|---|---|---|
required |
bool |
If true, the tag must be present on every row. |
allowed_values |
list (optional) |
Whitelist of allowed values for this tag. |
Field Definition
| Field | Type | Description |
|---|---|---|
required |
bool |
If true, the field must be present on every row. |
type |
str (optional) |
Expected data type: "float", "integer", "string", "boolean", "uint64". |
allowed_values |
list (optional) |
Whitelist of allowed values for this field. |
Validation Logic
For each incoming row, the plugin checks (in order):
- Measurement name: Is the table name in
allowed_measurements? (if defined) - Table schema exists: Is there a schema definition for this table in
tables? If not, the table is skipped. - Required tags: Are all required tags present?
- Tag values: Are tag values in the
allowed_valueslist? (if defined) - Required fields: Are all required fields present?
- Field types: Do field values match the expected type? (if defined)
- Field values: Are field values in the
allowed_valueslist? (if defined) - Field stripping: Any extra tags/fields not defined in the schema are stripped from the output.
If any check fails, the row is rejected and not written to the target.
File Structure
schema_validator/
schema_validator.py # Main plugin code
schema_validator_config.json # Example schema definition
schema_validator_trigger_config.toml # Example TOML trigger config
README.md # This file
Notes
- The schema JSON file is cached for 5 minutes. To force a reload, restart the trigger or wait for the cache to expire.
- Extra tags/fields not defined in the schema are silently stripped from the output (not written to the target).
- Measurements without an entry in
tablesare skipped entirely (no data written). - The
target_tableproperty in a table schema takes precedence overtarget_table_prefix/target_table_suffix. - The
_schema_rejectionstable (ifwrite_rejection_log=true) is written to the target database. - Uses
write_sync/write_sync_to_dbwithno_sync=Truefor optimal memory performance. - Valid rows are batched per table and written in a single call for efficiency.
Ready to get started?
Download InfluxDB 3 and have running in minutes.