This is part two of Getting Started Tutorials for the TICK Stack. If you’re new to the TICK Stack, I recommend first learning about different methods for writing static data in batches to InfluxDB in part one of this Getting Started series. This is a beginner’s tutorial for how and when to write real-time data to InfluxDB using:

  • Telegraf and the Exec Plugin
  • Telegraf and the Tail Plugin

The repo for this tutorial is here. For this tutorial I used Alpha Vantage’s free “Digital & Crypto Currencies Realtime” API to get the data. After you claim your key, Alpha Vantage offers real-time intraday BTC price and volume data in 5 min resolution. Here is an example of the output data.

1. Telegraf and the Exec Plugin

Using Telegraf with the Exec Input Plugin allows the user to exercise commands at a set interval to retrieve metrics and write them to InfluxDB. First, I make a request to Alpha Vantage and convert the last data point to line protocol with nanosecond precision with the function data_requests(). I am interested only in gathering the last datapoint because I don’t want to rewrite 24 hours worth of data every 5 minutes.

#exec.py
import pandas as pd
import requests
from alphavantage_auth import key
import datetime
import time

#Using Alpha Vantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
#make request
def data_request():
    data = requests.get(target_url).json()
    #data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
    #we only want the last datapoint
    t = [t for t in data['Time Series (Digital Currency Intraday)']]
    t = t[0]
    #convert human readable time to unix time
    t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
    unix = int(t.strftime("%s"))
    #convert timestamp to nanosecond precision
    unix_ns = str(unix) + "000000000"
    fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
    #convert to line protocol
    line = str("price"
         + ",type=BTC"
         + " "
         + "price=" + str(fields[0]['1a. price (USD)']) + ","
         + "volume=" + str(fields[0]['2. volume'])
         + " " + unix_ns)
    # print("data gathered and converted")
    return(line)

Then I configure my exec.conf file which contains the Exec Input and the InfluxDB Output with these changes:

  • Set the data collection interval to 300s (5 min) since Alpha Vantage only offers data in 5 min resolution (line 28):  interval = "300s"
  • Specify the timestamp precision (line 65): precision = "ns"
  • Omit the “host” tag (line 78): omit_hostname = true
  • Specify server by uncommenting (line 93): urls = ["http://127.0.0.1:8086"] # required
  • Create a target database to write the data to (line 96): database = "exec"
  • Specify the command for the exec plugin to execute (line 228): commands = ["python /Users/anaisdotis-georgiou/Desktop/GettingStarted_StreamingData/exec.py" ]
  • Since I only have one command, I can comment out the name suffix (line 235): #name_suffix = "_mycollector"
  • Specify the data format to consume. Since the data has been converted to line protocol, influx is chosen (line 241): data_format = "influx"

Now I can navigate to the directory where my exec.conf file resides and run telegraf -config exec.conf. I am able to see the first point immediately, but I suggest coming back after 30 minutes to make sure that Telegraf is successfully writing real-time data:

2. Telegraf and the Tail Plugin

The Tail Input Plugin tails a logfile and parses the metrics, and Telegraf writes the data to InfluxDB. Tail.py is similar to exec.py except that each new point is appended to tail.txt first instead of being written directly to InfluxDB. Also, the run delay is defined within the tail.py with time.sleep() instead of being specified by the execution interval within the config, like with the Exec plugin.

import pandas as pd
import requests
import time
from alphavantage_auth import key
import datetime

#Using Alpha Vantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
while True:
    #make request
    data = requests.get(target_url).json()
    #data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
    #we only want the last datapoint
    t = [t for t in data['Time Series (Digital Currency Intraday)']]
    t = t[0]
    t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
    unix = int(t.strftime("%s"))
    #convert to nanosecond precision
    unix_ns = str(unix) + "000000000"
    fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
    #convert to line protocol
    line = ["price"
         + ",type=BTC"
         + " "
         + "price=" + str(fields[0]['1a. price (USD)']) + ","
         + "volume=" + str(fields[0]['2. volume'])
         + " " + unix_ns]
    thefile = open('Data/tail.txt', 'a+')
    for item in line:
        thefile.write("%s\n" % item)
    print("line added")
    #alpha vantage only adds points every 5 min, so set script to sleep for 5 min as well
    time.sleep(300)

Then I configure my exec.conf file which contains the Tail Input and the InfluxDB output with these changes:

  • Omit the “host” tag (line 78): omit_hostname = true
  • Specify server by uncommenting (line 93): urls = ["http://127.0.0.1:8086"] # required
  • Create a target database to write the data to (line 96): database = "tail"
  • Provide the pathway for tail.txt (line 241):
    files = ["/Users/anaisdotis-georgiou/Desktop/GettingStarted_StreamingData/Data/tail.txt"]
  • Set the “Read from Beginning” parameter to false so that Tail reads the appended data, like the unix command tail -F --lines=0 myfile.log (line 243): from_beginning = false
  • Specify the data format to consume. Since the data has been converted to line protocol, influx is chosen (line 254): data_format = "influx"

Exec vs. Tail and When You Should Use Them

Now that you know how to use these easy plugins, you might be wondering: “When should I use them”? I hope to benchmark and compare the two approaches in a future blog post, but for now I can offer some guidelines. The Exec Input Plugin is probably the easiest option if your data collection includes scraping or making API requests. The Tail Input Plugin is perfect for IoT monitoring. For example, you could format the data coming in on the TTY of a Telegraf box to line protocol. The Tail plugin would “tail” your  device, and all of the data would go directly to InfluxDB.

I hope you’re starting to feel more comfortable using Telegraf and InfluxDB. If you have any questions, please post them on the community site or tweet us @InfluxDB. Thanks!

X
Contact Sales