Getting Started with Python and InfluxDB v2.0

Navigate to:

(Update: InfluxDB 3.0 moved away from Flux and a built-in task engine. Users can use external tools, like Python-based Quix, to create tasks in InfluxDB 3.0.)

With 200+ plugins, Telegraf has a wide variety of data collection applications. However, sometimes you need to collect custom data or maybe you want to integrate external tools into your time data analysis. In that case, it makes sense to take advantage of InfluxDB’s Client Libraries. Today we will focus on how to use the latest InfluxDB Python Client Library with InfluxDB v2.0. If you are running InfluxDB v1.x, please take a look at this tutorial instead.

InfluxDB Python Client Library

The InfluxDB Python Client has undergone some pretty big improvements since v1.x. It is faster - much faster - and easier to use. It supports multiprocessing and allows you to return your query as a Pandas DataFrame. The WriteAPI supports synchronous, asynchronous and batching writes into InfluxDB v2.0. The WriteAPI also supports 4 different write options. The QueryAPI also supports several query options, with the ability to interrupt a query stream.

Requirements

This tutorial was executed on a MacOS system with Python 3 installed via Homebrew and Conda for Python 3.6 and Python 3.7, respectively. Python 3.7 is required to execute multiprocessing. I recommend setting up additional tooling like virtualenv, pyenv, or conda-env to simplify Python and Client installations. Otherwise, the full requirements can be found here.

Installation

To install the InfluxDB Python Client Library, simply run:

pip install influxdb-client

If you already have the client installed, you can upgrade it with:

pip3 install --upgrade influxdb-client

Gathering Auth Parameters

In order to use the client, you’ll need to gather the following parameters:

  • Bucket Name or ID:

Follow this documentation to create a bucket. To view your buckets either use the UI or execute

influx -t <your-token> -o <your-org> bucket find
  • Token:

Follow this documentation to create a token. To view your tokens either use the UI or execute

influx auth find
  • Org:

To view your orgs either use the UI or execute

influx org find

Imports and connections

Import the Client:

from influxdb_client import InfluxDBClient

Establish a Connection:

client = InfluxDBClient(url="http://localhost:9999", token=token, org=org)

Writes

For this tutorial, we will explore all the ways we can write data related to the water levels of Coyote Creek. Our schema will look like this:

Bucket: “my-bucket” Measurement: “h2o_feet” Tag Key: “location” Tag Value: “coyote_creek” Field Key: “water_level” Field Value: 1

First, instantiate the WriteAPI:

write_api = client.write_api()

The WriteAPI also supports 4 different write options: Line Protocol String, Line Protocol Bytes, Data Point Structure, and Dictionary Style.

Write Option One - Line Protocol String:

Line protocol is the ingest format for Influx. It is useful when you already have data that is converted into line protocol (e.g. txt files).

write_api.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=1"])

Write Option Two - Line Protocol Bytes:

This is line protocol that is already encoded with UTF-8 as byte array. The client converts line protocol string into this format internally.

write_api.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=1".encode()])

Write Option Three - Data Point structure:

This structure is useful when constructing line protocol in client code. The Point() class ensures that your data is properly serialized into line protocol.

write_api.write("my-bucket", "my-org", Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 1).time(1))

Write Option Four - Dictionary Style:

This is the most Python compatible way to represent data. Dictionaries are converted back into Point() structure internally.

write_api.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"}, "fields": {"water_level": 1}, "time": 1}])

Now that we know about all the ways that we can write to the database, let’s explore how we execute those writes. We have a few tuning options available to us. We can specify synchronous and asynchronous writes. We can also tune the batching of our writes.

Specify Write Options: Synchronous Writes:

First you’ll need to import your write_option method with:

from influxdb_client.client.write_api import SYNCHRONOUS

To specify a synchronous write, simply instantiate the WriteAPI with that option as the parameter.

write_api = client.write_api(write_options=SYNCHRONOUS)

Specify Write Options: Asynchronous Writes:

First you’ll need to import your write_option method with:

from influxdb_client.client.write_api import ASYNCHRONOUS

To specify an asynchronous write, simply instantiate the WriteAPI with that option as the parameter.

write_api = client.write_api(write_options=ASYNCHRONOUS)

Specify Write Options: Batching

The default instance of WriteApi uses batching. You can specify the following batch parameters:

  • batch_size: the number of data points to collect in a batch
  • flush_interval: the number of milliseconds before the batch is written
  • jitter_interval: the number of milliseconds to increase the batch flush interval by a random amount
  • retry_interval: the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header

Please look at the glossary to learn more about the functions of these parameters.

Example of batching options:

write_api = client.write_api(write_options=WriteOptions(batch_size=500, flush_interval=10_000, jitter_interval=2_000, retry_interval=5_000))

Querying the database

You have two options when it comes to querying Influx. You can either query using a table structure or query using a stream. You can also interrupt a stream after you’ve returned your required data. However, in order to perform a query we must first instantiate the QueryAPI.

First, instantiate the QueryAPI:

query_api = client.query_api()

Generate a Flux query:

You must query your data with Flux. If you’ve never written a Flux query, I recommend exploring the following resources:

A Flux query for the data point we just wrote would look like:

query = ' from(bucket:"my-bucket")\
|> range(start: -10m)\
|> filter(fn:(r) => r._measurement == "h2o_level")\
|> filter(fn: (r) => r.location == "coyote_creek")\
|> filter(fn:(r) => r._field == "water_level" )'

We can then use the table structure to return our data as:

Query Option One - Table Structure:

result = client.query_api().query(org=my-org, query=query)

results = []
for table in result:
    for record in table.records:
        results.append((record.get_value(), record.get_field()))

print(results)
[(Water_level, 1)]

The Flux Object has the following methods for accessing your data:

  • .get_measurement()
  • .get_field()
  • .values.get("<your tags>")
  • .get_time()
  • .get_start()
  • .get_stop()
  • .get_measurement()

Query Option Two - Stream

records = client.query_api.query_stream(org= "my-org", query=query)

for record in records:
    print(f'Temperature in {record["location"]} is {record["_value"]}')

Temperature in coyote_creek is 1.

Script for writing and querying points to InfluxDB with the Python Client

Now that we understand all of our options, let’s look at what a full script might look like to write and query points to InfluxDB with the Python Client. You can find the repo with this script here.

from influxdb_client import InfluxDBClient

org = "my-org"
bucket = "my-bucket"
token = $my-token
query = 'from(bucket: "my-bucket")\
|> range(start: -10m)\
|> filter(fn: (r) => r._measurement == "h2o_level")\
|> filter(fn: (r) => r._field == "water_level")\
|> filter(fn: (r) => r.location == "coyote_creek")'

#establish a connection
client = InfluxDBClient(url="http://localhost:9999", token=token, org=org)

#instantiate the WriteAPI and QueryAPI
write_api = client.write_api()
query_api = client.query_api()

#create and write the point
p = Point("h2o_level").tag("location", "coyote_creek").field("water_level", 1)
write_api.write(bucket=bucket,org=org,record=p)
#return the table and print the result
result = client.query_api().query(org=org, query=query)
results = []
for table in result:
    for record in table.records:
        results.append((record.get_value(), record.get_field()))
print(results)

I hope this tutorial helps you get started with Influx. As always, if you run into hurdles, please share them on our community site or Slack channel. We’d love to get your feedback and help you with any problems you run into.