Querying InfluxDB Cloud with the Java Flight SQL Client

Navigate to:

InfluxDB Cloud 3.0 is a versatile time series database built on top of the Apache ecosystem. You can query InfluxDB Cloud with the Apache Arrow Flight SQL interface, which provides SQL support for working with time series data. In this tutorial, we will walk through the process of querying InfluxDB Cloud with Flight SQL, using Java. The Java Flight SQL Client is part of Apache Arrow Flight, a framework for building high-performance data services. It provides a way to efficiently transmit large datasets over gRPC, a modern high-performance RPC framework. Try out querying InfluxDB Cloud with the Java Flight SQL Client for yourself with this repo.

Requirements and setup

This tutorial assumes that you already have a free InfluxDB Cloud account. It also assumes that you have Docker running on your machine.

Finally you’ll need to create or obtain the following InfluxDB resources:

  • A bucket
  • A token
  • Your organization (usually the email you used to sign up for your account)

You’ll also need to write data to your InfluxDB account. The simplest way to do this is to write some line protocol manually through the UI. Navigate to Load Data > Buckets > +Add Data > Line Protocol > Enter Manually, select the bucket you want to write to, and write a point to InfluxDB. For example you could write measurementName,tagKey=tagValue fieldKey=1.0. Or, if you want real-world line protocol data, try the NOAA air sensor dataset. You can also check out the following documentation on writing data to InfluxDB Cloud 3.0 for other methods of writing data to InfluxDB Cloud.

Code walkthrough

Let’s break down the code into smaller pieces to understand what’s happening.

  1. Import required classes: We start by importing the required classes from Apache Arrow Flight and other necessary libraries.
    import io.grpc.CallOptions;
    import org.apache.arrow.flight.*; 
    import org.apache.arrow.flight.auth2.BearerCredentialWriter; 
    import org.apache.arrow.flight.grpc.CredentialCallOption; 
    import org.apache.arrow.flight.sql.FlightSqlClient;
    import org.apache.arrow.memory.BufferAllocator; 
    import org.apache.arrow.memory.RootAllocator; 
    import org.apache.arrow.vector.VectorSchemaRoot; 
    import io.grpc.Metadata;
    import java.net.URI;
    
  2. Define the main class: We define a JavaExample class with a main method where our code will execute.
    public class JavaExample { public static void main(String[] args) {...} }
    
  3. Set up the connection: We define the host variable, which should be the InfluxDB instance URL without the protocol (“https://”) part.
    String host = "<host without https:// i.e. us-east-1-1.aws.cloud2.influxdata.com>";
    
  4. Next, we define the query that we want to execute:
    String query = "SELECT *";
    
  5. Now, we create a Location object using the forGrpcTls method, which sets up the connection with gRPC and TLS (Transport Layer Security) encryption.
    Location location = Location.forGrpcTls(host, 443);
    
  6. Authentication: We set up the authentication using the BearerCredentialWriter and CredentialCallOption classes. Replace 'your token' with your actual InfluxDB authentication token.
    CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter("your token"));
  7. Initialize the FlightClient: First, create a BufferAllocator using RootAllocator to manage memory allocation:
    BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
  8. We create a custom interceptor that injects the database header on every request. Replace 'your bucket' with the name of your InfluxDB bucket.
    FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() { 
    @Override public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { outgoingHeaders.insert("database", "your bucket"); } 
    @Override public void onHeadersReceived(CallHeaders incomingHeaders) { } 
    @Override public void onCallCompleted(CallStatus status) { } };
  9. Now, we build the FlightClient using the allocator, location, and the custom interceptor:
    FlightClient client = FlightClient.builder(allocator, location).intercept(f)
    .build();
  10. Create the SQL client by wrapping the FlightClient in the FlightSQLClient constructor.
    FlightSqlClient sqlClient = new FlightSqlClient(client);
  11. Use the execute method and pass in the auth and query to execute a query on the server. flightInfo is an object that contains metadata and the location of the result data. Use the getStream() to fetch the data. Then convert the output to string and print it with contentToTSVString()
    FlightInfo flightInfo = sqlClient.execute(query, auth);
    final FlightStream stream = sqlClient.getStream(flightInfo.getEndpoints().get(0).getTicket(), auth);
            while (stream.next()) {
                try {
                    final VectorSchemaRoot root = stream.getRoot();
                    System.out.println(root.contentToTSVString());
                } catch (Exception e) {
                    // handle the exception here, e.g. print error message
                    System.out.println("Error executing FlightSqlClient: " + e.getMessage());
                }
            }

You can find the full script here.

Query InfluxDB Cloud with Java Flight SQL

To run the example in the corresponding repo, follow the following steps:

  1. Clone the repo and cd into it.
  2. Run docker build -t myimage
  3. Run docker run myimage

Resources and conclusion

Take a look at the following documentation. It helped me build this example, and it can help you on your journey with querying InfluxDB Cloud:

  1. Reference documentation for Arrow Flight
  2. Reference documentation for Arrow Flight SQL for the Java Client
  3. A blog post on InfluxDB, Flight SQL, Pandas, and Jupyter Notebooks Tutorial
  4. A blog post on TL;DR InfluxDB Tech Tips: Downsampling with Flight SQL and AWS Lambda

I hope this blog post inspires you to explore InfluxDB Cloud and take advantage of Flight SQL to transport large datasets from InfluxDB for data processing with the tools of your choice. If you need any help, please reach out using our community site or Slack channel. I’d love to hear about what you’re trying to achieve and what features you’d like the task system in InfluxDB to have.