Apache Arrow Basics: Coding with Apache Arrow Python

Navigate to:

So by now, you are probably aware that InfluxData has been busy building the next generation of the InfluxDB storage engine. If you dig a little deeper, you will start to uncover some concepts that might be foreign to you:

  • Apache Parquet

  • Apache Arrow

  • Arrow Flight

These open-source projects are some of the core building blocks that make up the new storage engine. For the most part, you won’t need to worry about what’s under the hood. Though if you are like me and want a more practical understanding of what some of the projects are, then join me on my journey of discovery.

The first component we are going to dig into is Apache Arrow. My colleague Charles gave a great high-level overview, which you can find here. In short:

“Arrow manages data in arrays, which can be grouped in tables to represent columns of data in tabular data. Arrow also provides support for various formats to get those tabular data in and out of disk and networks. The most commonly used formats are Parquet (You will be exposed to this concept quite a bit).”

For performance reasons, our developers used Rust to code InfluxDB’s new storage engine. I personally like to learn new coding concepts in Python, so we will be making use of the pyarrow client library.

The basics

In Apache Arrow, you have two primary data containers/classes: Arrays and Tables. We will dig more into what these are later, but let’s first write a quick snippet of code for creating each:

import pyarrow as pa

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])
print(table)

So in this example, you can see we constructed 3 arrays of values: animal, count, and year. We can combine these arrays to form the columns of a table. The results of running this code look like so:

animal: string
count: int8
year: int16
----
animal: [["sheep","cows","horses","foxes"]]
count: [[12,5,2,1]]
year: [[2022,2022,2022,2022]]

So now that we have a table to work with, let’s see what we can do with it. The first primary feature of Arrow is to provide facilities for saving and restoring your tabular data (most commonly into the Parquet format, which will feature heavily in future blogs).

Let’s save and load our newly created table:

import pyarrow as pa
import pyarrow.parquet as pq

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes"], type=pa.string())
count = pa.array([12, 5, 2, 1], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])

# Save the table to a Parquet file
pq.write_table(table, 'example.parquet')

# Load the table from the Parquet file
table2 = pq.read_table('example.parquet')
print(table2)

Lastly, to finish the basics, let’s try out a compute function (value_counts). We can apply compute functions to arrays and tables, which then allows us to apply transformations to a dataset. We will cover these in greater detail in the next section but let’s start with a simple example:

import pyarrow as pa
import pyarrow.compute as pc

# Create a array from a list of values
animal = pa.array(["sheep", "cows", "horses", "foxes", "sheep"], type=pa.string())
count = pa.array([12, 5, 2, 1, 10], type=pa.int8())
year = pa.array([2022, 2022, 2022, 2022, 2021], type=pa.int16())

# Create a table from the arrays
table = pa.Table.from_arrays([animal, count, year], names=['animal', 'count', 'year'])

count_y = pc.value_counts(table['animal'])
print(count_y)

As you can see, call the library pyarrow.compute as pc and use the built-in count function. This allows us to count the number of values within a given array or table. We chose to count up the number of animals, which produces the following output:

-- child 0 type: string
  [
    "sheep",
    "cows",
    "horses",
    "foxes"
  ]
-- child 1 type: int64
  [
    2,
    1,
    1,
    1
  ]

A practical example

So I decided to skip listing all the datatypes and processors to you and thought I would show you a more realistic example of using Apache Arrow with InfluxDB’s TSM engine. Now, spoiler: this is not how you interact with InfluxDB’s new storage engine (querying is vastly slicker than that). This is purely a means to an end to pulling a large sample dataset out of InfluxDB into PyArrow so we can experiment with it. InfluxDB’s new storage engine will allow the automatic export of your data as Parquet files.

So the plan:

  • Query InfluxDB using the conventional method of the InfluxDB Python client library (Using the to data frame method).

  • Use Apache Arrow’s built-in Pandas Dataframe conversion method to convert our data set into our Arrow table data structure.

  • Then we will use a new function to save the table as a series of partitioned Parquet files to disk.

  • Lastly a second script will reload the partitions and perform a series of basic aggregations on our Arrow Table structure.

Let’s take a look at the code:

create_parquet.py

from influxdb_client import InfluxDBClient
from pandas import DataFrame as df
import pyarrow.dataset as ds
import pyarrow as pa

with InfluxDBClient(url="http://localhost:8086", token="edge", org="influxdb", debug=False) as client:
    query_api = client.query_api()
    query = '''
    import "influxdata/influxdb/sample"
    sample.data(set: "usgs")
        |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> group() 
    '''

df = query_api.query_data_frame(query=query)

table = pa.Table.from_pandas(df)
print(table)
print("Saving to parquet files...")

# Drop result and table columns
table = table.drop(["result", "table"])
print(table)

# partitioning of your data in smaller chunks
ds.write_dataset(table, "usgs", format="parquet",
                 partitioning=ds.partitioning(
                    pa.schema([table.schema.field("_measurement")])
                ))

So the two functions you will be unfamiliar with are;

  • pa.Table.pandas(df) : This automatically converts our data frame into an Arrow table. Note: Pay attention to your data types! Consult the official documentation for more information.

  • Write_dataset(… partitioning=ds.partitioning(…)) : This modified method partitions our table into Parquet files based upon the values within our ‘_ measurement’ column. This will look like a tree of directories. This method helps to separate large datasets into more manageable assets.

Let’s now take a look at the second script, which works with our saved Parquet files:

import pyarrow.dataset as ds

# Loading back the partitioned dataset will detect the chunks                
usgs = ds.dataset("usgs", format="parquet", partitioning=["_measurement"])
print(usgs.files)

# Convert to a table
usgs = usgs.to_table()
print(usgs)

# Grouped Aggregation example
aggregation = usgs.group_by("_measurement").aggregate([("rms", "mean"), ("rms", "max"), ("rms", "min") ]).to_pandas()
print(aggregation)

In this script we deploy several new functions you might be familiar with if you work with Pandas or other query engines: group_by and aggregate. We use these functions to group our data points based on the measurement and provide a mathematical aggregate to each group (mean, max, mode). This generates a new Arrow table based on the aggregations. We then convert the table back to a data frame for readability.

Conclusion

I hope this blog empowers you to start digging deeper into Apache Arrow and helps you to understand why we decided to invest in the future of Apache Arrow and its child products. I also hope it gives you the foundations to start exploring how you can build your own analytics applications from this framework. InfluxDB’s new storage engine emphasizes its commitment to the greater ecosystem. For instance, allowing the exportation of Parquet files gives us the opportunity to analyze our data in platforms such as Rapid Miner and other analytical platforms.

My call to action for you is to check out the code here and discover some of the other processor functionality Apache Arrow offers. A lot of the content coming up will be around Apache Parquet, so if there are any products/platforms that use Parquet that you would like us to talk about let us know. Come join us on Slack and the forums. Share your thoughts — I look forward to seeing you there!