Quix for InfluxDB

Use This InfluxDB Integration for Free

Why use Quix for InfluxDB?

Quix for InfluxDB allows you to build real-time data pipelines that pre-process data and trigger actions—all in Python. If you already know Python, it’s the perfect alternative to Kapacitor and Flux because it can carry out the same common tasks, such as data integration, enrichment, downsampling, and alerting. You can use Quix for any use case where you need to detect events and trigger actions in external systems, e.g. anomaly detection and predictive maintenance.

Quix consists of Quix Streams and Quix Cloud. Quix Streams is an open source Python client library with Streaming DataFrames and stateful operations, such as window functions and custom aggregations. It provides a high-level abstraction over Apache Kafka to enable Python developers to work with streaming data with resilience and durability guarantees. Being open source, there is no vendor lock-in for your business logic. Quix Streams runs anywhere Python 3.8+ is installed.

Quix Cloud is a fully-managed platform for running Quix Streams tasks in streaming ETL pipelines. You can build pipelines using the cloud IDE and the visual DAG editor and deploy with a single click. Quix Cloud provides managed Apache Kafka, Kubernetes, monitoring, and CI/CD. You can use it as a serverless platform or deploy it to your on-premise infrastructure to build a hybrid edge-to-cloud data platform.

How to use this integration

To use this integration, you need to have Apache Kafka running somewhere—Quix uses Kafka as its data backbone to store messages, alerts, and intermediate computations/aggregations. You can bring your own Kafka cluster or use the managed Kafka on Quix Cloud.

You can sign up for a free trial account on the Quix website.

Once you have an account, create a project, create an initial environment, then create an application. Navigate to the code samples and locate the input InfluxDB 3.0 Source connector.

Define the following environment variables:

  • output: The name of the output Kafka topic that will receive the stream. If the topic doesn’t exist, it gets created automatically on first run. (Default: influxdb, Required: True)
  • task_interval: Interval to run query. Must be in InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1mo, 1y (Default: 5m, Required: True)
  • INFLUXDB_HOST: Host address for the InfluxDB instance. (Default: eu-central-1-1.aws.cloud2.influxdata.com, Required: True)
  • NFLUXDB_TOKEN: Authentication token to access InfluxDB. (Default: TOKEN, Required: True)
  • INFLUXDB_ORG: Organization name in InfluxDB. (Default: ORG, Required: False)
  • INFLUXDB_DATABASE: Database name in InfluxDB where data is stored. (Default: DATABASE, Required: True)
  • INFLUXDB_MEASUREMENT_NAME: The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used, (Required: False)

The following code runs in the Quix IDE, which you’ll see after you create your first application (you can also work locally by creating an application from this code using the Quix samples library).

# Import utility modules
import os
import random
import json
import logging
from time import sleep

# Import vendor-specific libraries
from quixstreams import Application
import influxdb_client_3 as InfluxDBClient3

# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Create a Quix Application
app = Application(consumer_group="influxdb_sample", auto_create_topics=True)

# Define the topic using the "output" environment variable, along with how its
# data should be serialized
topic = app.topic(
    name=os.environ["output"],
    key_serializer="string",
    value_serializer="json"
)

influxdb3_client = InfluxDBClient3.InfluxDBClient3(
    token=os.environ["INFLUXDB_TOKEN"],
    host=os.environ["INFLUXDB_HOST"],
    org=os.environ["INFLUXDB_ORG"],
    database=os.environ["INFLUXDB_DATABASE"]
)

measurement_name = os.getenv("INFLUXDB_MEASUREMENT_NAME", os.environ["output"])
interval = os.getenv("task_interval", "5m")

# Global variable to control the main loop's execution
run = True

# InfluxDB interval-to-seconds conversion dictionary
UNIT_SECONDS = {
    "s": 1,
    "m": 60,
    "h": 3600,
    "d": 86400,
    "w": 604800,
    "y": 31536000,
}

# Helper function to convert time intervals (like 1h, 2m) into seconds for easier processing.
# This function is useful for determining the frequency of certain operations.
def interval_to_seconds(interval: str) -> int:
    try:
        return int(interval[:-1]) * UNIT_SECONDS[interval[-1]]
    except ValueError as e:
        if "invalid literal" in str(e):
            raise ValueError(
                "interval format is {int}{unit} i.e. '10h'; "
                f"valid units: {list(UNIT_SECONDS.keys())}")
    except KeyError:
        raise ValueError(
            f"Unknown interval unit: {interval[-1]}; "
            f"valid units: {list(UNIT_SECONDS.keys())}")

interval_seconds = interval_to_seconds(interval)

# Function to fetch data from InfluxDB and send it to Quix
# It runs in a continuous loop, periodically fetching data based on the interval.
def get_data():
    # Run in a loop until the main thread is terminated
    while run:
        try:
            query_definition = f'SELECT * FROM "{measurement_name}" WHERE time >= now() - {interval}'
            logger.info(f"Sending query {query_definition}")
            # Query InfluxDB 3.0 using influxql or sql
            table = influxdb3_client.query(
                query=query_definition,
                mode="pandas",
                language="influxql"
            )
            table = table.drop(columns=["iox::measurement"])

            # If there are rows to write to the stream at this time
            if not table.empty:
                # Convert to JSON for JSON-to-bytes serializer
                json_result = table.to_json(orient='records', date_format='iso')
                yield json_result
                logger.info("query success")
            else:
                logger.info("No new data to publish.")

            # Wait for the next interval
            sleep(interval_seconds)

        except Exception as e:
            logger.error(f"query failed; error: {e}")
            sleep(1)

def main():
    """
    Read data from the Query and publish it to Kafka
    """

    # Create a pre-configured Producer object.
    # Producer is already setup to use Quix brokers.
    # It will also ensure that the topics exist before producing to them if
    # Application is initialized with "auto_create_topics=True".

    with app.get_producer() as producer:
        for res in get_data():
            # Parse the JSON string into a Python object
            records = json.loads(res)
            for index, obj in enumerate(records):
                # Generate a unique message_key for each row
                message_key = f"INFLUX_DATA_{str(random.randint(1, 100)).zfill(3)}_{index}"
                logger.info(f"Produced message with key:{message_key}, value:{obj}")

                # Serialize data for kafka producing
                serialized = topic.serialize(key=message_key, value=obj)

                # publish the data to the topic
                producer.produce(
                    topic=topic.name,
                    key=serialized.key,
                    value=serialized.value,
                )

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        logger.info("Stop requested...")
        run = False
    finally:
        logger.info("Application has been stopped")

You can find more details on how to run and deploy an application in the Quix documentation.

Once the data is available in a Kafka topic, you can use another service to consume and process the data (you can port over common InfluxDB tasks such as downsampling). When you’re ready to write the data back into InfluxDB, use the corresponding output InfluxDB 3.0 Sink connector.

For inspiration, see the Quix Templates gallery for ready-to-run reference use cases/data pipelines with full source code. You may be interested in syncing data between InfluxDB v2 and v3 as well as predictive maintenance with InfluxDB.

Note: The connectors covered here are for InfluxDB 3.0 but there is also an input InfluxDB 2.0 Source connector available in the Quix Samples GitHub repository.

For more information, please check out the documentation.

Project URL   Documentation

Related resources

InfluxDb-cloud-logo

The most powerful time series
database as a service

Get Started for Free
Influxdbu

Developer Education

Training for time series app developers.

View All Education