Downsampling to InfluxDB Cloud Dedicated with Java Flight SQL Client

Navigate to:

InfluxDB Cloud Dedicated is a hosted and managed InfluxDB Cloud cluster dedicated to a single tenant. The InfluxDB time series platform is designed to handle high write and query loads so you can use and leverage InfluxDB Cloud Dedicated for your specific time series use case. In this tutorial, we walk through the process of reading data from InfluxDB Cloud Dedicated using the Java Flight SQL client. The Java Flight SQL client is part of Apache Arrow Flight, a framework for building high-performance data services. It provides a way to efficiently transmit large datasets over gRPC, a modern high-performance RPC framework. Try querying InfluxDB Cloud Dedicated with the Java Flight SQL client for yourself with this repo.

Requirements and setup

This tutorial assumes that you already have an InfluxDB Cloud Dedicated account. It also assumes that you have Docker available.

Please keep your cluster URL handy.

You will need to create the following:

  • A source database

  • A target database

  • Source token with Read/Write permissions

  • Target token with Read/Write permissions

During the initial setup, you need to load the data for downsampling. For the purpose of this tutorial, I have used the NOAA air sensor dataset. However, other data sources such as Telegraf can be used. Here is a simple Telegraf configuration to load the data. Check out the following documentation on writing data to InfluxDB Cloud Dedicated for other methods of writing data to InfluxDB Cloud.

Code walkthrough

  1. Import required classes: We start by importing the required classes from Apache Arrow Flight and other necessary libraries.
    import org.apache.arrow.vector.VectorSchemaRoot;
    import org.apache.arrow.vector.types.pojo.ArrowType;
    
    import java.time.Instant;
    import java.time.LocalDateTime;
    import java.time.ZoneId;
    import java.time.ZoneOffset;
    import java.time.temporal.TemporalField;
    import java.util.concurrent.CountDownLatch;
  2. Define the main class: We define a CloudReadWriteExample class with a main method where our code will execute.
    public class CloudReadWriteExample {
      public static void main(String[] args) {
  3. Set up the connection configurations: We expect that the following variables are set in ENV:

    Source/target host: The source host is the Cloud Dedicated cluster URL without the protocol (“https://”) and the target host is the cluster URL with the protocol (“https://”)

    Source/Target database: In this example, the source database stores the air sensor data before downsampling and the target database stores the downsampled data.

    Source/target token: Tokens for the source and target databases, respectively.

    Query: In this example, the query variable contains the downsampling query. You can replace it with any query based on your requirement.

     //ReadConfigs
        private static final String SOURCE_HOST = System.getenv("SOURCE_URL");
        private static final String SOURCE_HOST = System.getenv("SOURCE_HOST");
        private static final String READ_TOKEN = System.getenv("READ_TOKEN");
        private static final String SOURCE_DATABASE_NAME = System.getenv("SOURCE_DATABASE_NAME");
        private static final String DOWNSAMPLE_QUERY = System.getenv("DOWNSAMPLE_QUERY");
    
        //WriteConfigs
        private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_URL");
        private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_CLUSTER_URL");
        private static final String WRITE_TOKEN = System.getenv("WRITE_TOKEN");
        private static final String TARGET_DATABASE_NAME = System.getenv("TARGET_DATABASE_NAME");
        private static final String TARGET_TABLE_NAME = System.getenv("TARGET_TABLE_NAME");
  4. Create an interceptor that injects header metadata, in this case the database name, in every request.
    FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() {
                @Override
                public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) {
                    outgoingHeaders.insert("database", SOURCE_DATABASE_NAME);
                }
  5. Create a ‘Location’ object using the forGrpcTls method, which sets up the connection with gRPC and Transport Layer Security (TLS) encryption.
    Location location = Location.forGrpcTls(SOURCE_HOST, 443);
  6. Authentication: We set up the authentication using the BearerCredentialWriter and CredentialCallOption classes. Replace 'READ_TOKEN' with your actual Cloud Dedicated (i.e., source database) authentication token.
    CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter(READ_TOKEN));
  7. After successful authentication, execute returns a FlightInfo object that contains metadata and an endpoints list. Each endpoint contains the following: - A list of addresses where you can retrieve the data. - A ticket value that identifies the data to retrieve.
    FlightInfo flightInfo = sqlClient.execute(DOWNSAMPLE_QUERY, auth);
    
         // Extract the Flight ticket from the response.
         Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
  8. Retrieve the stream data of the query from the endpoint.
     // Pass the ticket to request the Arrow stream data from the endpoint.
            final FlightStream stream = sqlClient.getStream(ticket, auth);
  9. Create the InfluxDB WriteApi function using following code:
    WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder()
                            .batchSize(5000)
                            .flushInterval(1000)
  10. The code below iterates over the stream data, prepares measurement for each row, and writes the prepared point into the target database.
     while (stream.next()) {
                try {
                    // Get the current vector data from the stream.
                    final VectorSchemaRoot root = stream.getRoot();
                    System.out.println(root.contentToTSVString());
    
                    InfluxDBClient influxDBClient = InfluxDBClientFactory.create(TARGET_CLUSTER_URL, WRITE_TOKEN.toCharArray(), "", TARGET_DATABASE_NAME);
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    try (WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder()
                            .batchSize(5000)
                            .flushInterval(1000)
                            .backpressureStrategy(BackpressureOverflowStrategy.DROP_OLDEST)
                            .bufferLimit(10000)
                            .jitterInterval(1000)
                            .retryInterval(5000)
                            .build())) {
                        writeApi.listenEvents(WriteSuccessEvent.class, (value) -> countDownLatch.countDown());
                        writeAsPoints(writeApi, root);
                        writeApi.flush();
                    }
                } catch (Exception e) {
                    // Handle exceptions.
                    System.out.println("Error executing FlightSqlClient: " + e.getMessage());
                }
            }
  11. The code below is a utility method to prepare a measurement from each row from ArrowStream data.
    private void writeAsPoints(WriteApi writeApi, VectorSchemaRoot root) {
            int fields = root.getSchema().getFields().size();
            for(int i = 0; i < rowCount; i++) {
                Point point = Point.measurement(TARGET_TABLE_NAME);
                for (int j=0;j<fields;j++) {
                    String fieldName = root.getSchema().getFields().get(j).getName();
                    ArrowType fieldType = root.getSchema().getFields().get(j).getType();
                    if (fieldName.equalsIgnoreCase("time") && fieldType instanceof ArrowType.Timestamp) {
                        point.time(((LocalDateTime) root.getFieldVectors().get(j)
                            .getObject(i)).atZone(ZoneId.systemDefault()).toInstant(), WritePrecision.NS);
                    } else {
                        point.addField(fieldName, root.getFieldVectors().get(j).getObject(i).toString());
                    }
                }
                writeApi.writePoint(TARGET_DATABASE_NAME, TARGET_TABLE_NAME, point);
            }
        }

You can find the full script here.

Query, perform downsampling, and write data back to InfluxDB Cloud Dedicated with Java Flight SQL

To run the example in the corresponding repo, follow the following steps:

  1. Clone the repo and cd into it.

  2. Run docker build -t myimage

  3. Run docker run myimage

Visualizing the data with Grafana

Use Grafana to query and visualize data stored in an InfluxDB Cloud Dedicated database. InfluxDB Cloud Dedicated supports both SQL and InfluxQL query languages. Install the Grafana FlightSQL plugin to query InfluxDB with SQL using the Flight SQL protocol.

This Grafana installation link provides instructions on how to install Grafana and visualize the data.

Querying the data with Grafana

You can use the Flight SQL connection data source to query the data in a Cloud Dedicated database.

downsampling-influxdb-cloud-dedicated-java-flight-sql-client

Resources and conclusion

Take a look at the following documentation. It helped me build this example, and it can help you on your journey with querying InfluxDB Cloud Dedicated:

  1. Reference documentation for Arrow Flight.

  2. Reference documentation for Arrow Flight SQL for the Java Client.

  3. InfluxDB Cloud documentation for Querying data with Arrow Flight SQL in Python.

  4. A blog post on InfluxDB, Flight SQL, Pandas, and Jupyter Notebooks Tutorial.

  5. A blog post on Downsampling with Flight SQL and AWS Lambda.

I hope this blog post inspires you to explore InfluxDB Cloud Dedicated and take advantage of Flight SQL to transport large datasets from InfluxDB for data processing with the tools of your choice. If you need any help, please reach out using our community site or Slack channel. I’d love to hear about what you’re trying to achieve and what features you’d like InfluxDB to have.