If you’re looking to monitor your infrastructure or third-party applications, then Telegraf’s built-in plugins are a great option, whether you’re looking at system resources like disk and network utilization or the performance of your MySQL database.

What if you’re building an application, though, where you want to store user data in a Time Series Database? Maybe it’s an IoT or smart home application, and each user needs access to readings from, say, their smart toothbrush. You want to store the time and duration of each brushing session, send out alerts to remind the kids to brush their teeth, and keep track of things like battery health and how long the current brush head has been in use.

Collecting custom data, whether for a user-facing application or for an infrastructure requirement that Telegraf’s plugins don’t already cover, is probably going to require writing new code.

For the smart toothbrush example, maybe you have a base station which runs embedded Linux and communicates with the toothbrush using Bluetooth. You’ve already written up code that listens for incoming data, and it seems to be working well; now you need to get it into InfluxDB.

One method would be to run Telegraf alongside your application, and send it your data over a Unix, UDP, or TCP socket, letting Telegraf handle the connection to InfluxDB and batching and writing of points.

This is great if all you need is to collect data, but if you need to query and retrieve that data for your users, you’ll probably want to take advantage of one of the InfluxDB libraries available in various languages to handle the interaction with InfluxDB within your application itself.

There are a number of languages out there that already have InfluxDB libraries, many of them maintained by the community. We’ll take a closer look at using the influxdb-python library in this post, but if Python isn’t your style, you can find a list of libraries on the InfluxDB API client libraries page.

InfluxDB Python Client Library

While the influxdb-python library is hosted by InfluxDB’s GitHub account, it’s maintained by a trio of community volunteers, @aviau@xginn8, and @sebito91. Many thanks to them for their hard work and contributions back to the community.

What You’ll Need

The following examples were tested against a MacOS system with Python 3 installed via Homebrew (instructions here) and an Ubuntu 16.04 system using the default Python 3 installation.

Installations of Python can get a bit tricky; different versions of the language, as well as projects which require different versions of installed libraries, can quickly lead to conflicts. While we won’t go into the details of Python installations here, understanding how various versions are installed and interact with each other, and looking into additional tooling like virtualenv or pyenv might be useful.

You can find some additional articles about installing Python and additional tooling at The Hitchhiker’s Guide to Python (MacLinux).

We’ll also be sending data to a local instance of InfluxDB. If you don’t already have one, you can follow the installation instructions on our documentation page, or use the sandbox scripts to bring up a full TICK Stack in Docker.

Installing the Library

Like many Python libraries, the easiest way to get up and running is to install the library using pip.

We’re going to run pip using the -m argument to the Python command, in order to be certain which Python is the install target (as per this tip from Raymond Hettinger).

$ python3 -m pip install influxdb

You should see some output indicating success.

We’ll work through some of the functionality of the Python library using a REPL, so that we can enter commands and immediately see their output. Let’s start the REPL now, and import the InfluxDBClient from the python-influxdb library to make sure it was installed:

$ python3
Python 3.6.4 (default, Mar  9 2018, 23:15:03) 
[GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from influxdb import InfluxDBClient
>>>

No errors—looks like we’re ready to go!

Making a Connection

The next step will be to create a new instance of the InfluxDBClient (API docs), with information about the server that we want to access. Enter the following command in your REPL, replacing the values of host and port with the appropriate URL/IP address and port of your InfluxDB host. In this case, we’re running locally on the default port:

>>> client = InfluxDBClient(host='localhost', port=8086)

There are some additional parameters available to the InfluxDBClient constructor, including username and password, which database to connect to, whether or not to use SSL, timeout and UDP parameters.

If you wanted to connect to a remote host at mydomain.com on port 8086 with username myuser and password mypass and using SSL, you could use the following command instead, which enables SSL and SSL verification with two additional arguments, ssl=True and ssl_verify=True:

>>> client = InfluxDBClient(host='mydomain.com', port=8086, username='myuser', password='mypass' ssl=True, verify_ssl=True)

Now, let’s create a new database called pyexample to store our data:

>>> client.create_database('pyexample')

We can check if the database is there by using the get_list_database() function of the client:

>>> client.get_list_database()
[{'name': 'telegraf'}, {'name': '_internal'}, {'name': 'pyexample'}]

There it is, in addition to the telegraf and _internal databases I have on my install. Finally, we’ll set the client to use this database:

>>> client.switch_database('pyexample')

Inserting Data

Now that we have a database to write data to, and our client properly configured, it’s time to insert some data! We’re going to use our client’s write_points() methods to do so (API docs). This method takes a list of points and some additional parameters including “batch size”, which gives us the ability to insert data in batches as opposed to all at once. This can be useful if you’re inserting large amounts of data.

The write_points() method has an argument called points, which is a list of dictionaries, and contains the points to be written to the database. Let’s create some sample data now and insert it. First, let’s add three points in JSON format to a variable called json_body:

>>> json_body = [
    {
        "measurement": "brushEvents",
        "tags": {
            "user": "Carol",
            "brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
        },
        "time": "2018-03-28T8:01:00Z",
        "fields": {
            "duration": 127
        }
    },
    {
        "measurement": "brushEvents",
        "tags": {
            "user": "Carol",
            "brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
        },
        "time": "2018-03-29T8:04:00Z",
        "fields": {
            "duration": 132
        }
    },
    {
        "measurement": "brushEvents",
        "tags": {
            "user": "Carol",
            "brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
        },
        "time": "2018-03-30T8:02:00Z",
        "fields": {
            "duration": 129
        }
    }
]

These indicate “brush events” for our smart toothbrush; each one happens around 8AM in the morning, is tagged with the username of the person using the toothbrush and an ID of the brush itself (so we can track how long each brush head has been used for), and has a field which contains how long the user brushed for, in seconds.

Since we already have our database set, and the default input for write_points() is JSON, we can invoke that method using our json_body variable as the only argument, as follows:

>>> client.write_points(json_body)
True

You should see the response True being returned by the function if the write operation has been successful. If you’re building an application, you’d want this collection of data to be automatic, adding points to the database every time a user interacts with the toothbrush.

Querying Data

Now that we have some data in the database, let’s try running some queries to get it back out. We’ll use the same client object as we used to write data, except this time we’ll execute a query on InfluxDB and get back the results using our client’s query() function (API docs).

>>> client.query('SELECT "duration" FROM "pyexample"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "user"')
>>>

The query() function returns a ResultSet object (API Docs), which contains all the data of the result along with some convenience methods. Our query is requesting all the measurements in our pyexample database, grouped by user. You can use the .raw parameter to access the raw JSON response from InfluxDB:

>>> results.raw
{'statement_id': 0, 'series': [{'name': 'brushEvents', 'tags': {'user': 'Carol'}, 'columns': ['time', 'duration'], 'values': [['2018-03-28T08:01:00Z', 127], ['2018-03-29T08:04:00Z', 132], ['2018-03-30T08:02:00Z', 129]]}]}

In most cases you won’t need to access the JSON directly, however. Instead, you can use the get_points() method of the ResultSet to get the measurements from the request, filtering by tag or field. If you wanted to iterate through all of Carol’s brushing sessions; you could get all the points that are grouped under the tag “user” with the value “Carol”, using this command:

>>> points = results.get_points(tags={'user':'Carol'})

points in this case is a Python Generator, which is a function that works similarly to an Iterator; you can iterate over it using a for x in y loop, as follows:

>>> points = results.get_points(tags={'user': 'Carol'})
>>> for point in points:
...     print("Time: %s, Duration: %i" % (point['time'], point['duration']))
... 
Time: 2018-03-28T08:01:00Z, Duration: 127
Time: 2018-03-29T08:04:00Z, Duration: 132
Time: 2018-03-30T08:02:00Z, Duration: 129

Depending on your application, you might iterate through these points to compute the average brushing time for your user, or just to verify that there have been X number of brushing events per day.

If you were interested in tracking the amount of time an individual brush head has been used, you could substitute a new query that groups points based on the brushId, then take the duration of each of those points and add it to a sum. At a certain point you could alert your user that it’s time to replace their brush head:

>>> results = client.query('SELECT "duration" FROM "pyexample"."autogen"."brushEvents" WHERE time > now() - 4d GROUP BY "brushId"')
>>> points = results.get_points(tags={'brushId': '6c89f539-71c6-490d-a28d-6c5d84c0ee2f'})
>>> brush_usage_total = 0
>>> for point in points:
...     brush_usage_total = brush_usage_total + point['duration']
... 
>>> if brush_usage_total > 350:
...     print("You've used your brush head for %s seconds, more than the recommended amount! Time to replace your brush head!" % brush_usage_total)
... 
You've used your brush head for 388 seconds, more than the recommended amount! Time to replace your brush head!
>>>

Additional Documentation and Functionality

The influx-python library contains a fair bit of additional functionality that we didn’t cover in the article above. There is additional administrative functionality in the client like adding users, managing databases, and dropping measurements, as well as additional objects like SeriesHelper, which provides some convenience functionality for writing points in bulk, and DataFrameClient, which eases integration with PANDAS and DataFrames.

If you’re interested in using this library in your projects, it makes sense to spend some time with the API Documentation, and source code, understanding not only the functionality that is provided but the ways it works behind the scenes.

And if you build something cool with InfluxDB, we’d love to feature it on our blog, so share it with us on Twitter @InfluxDB!

X
Contact Sales