Outer Joins in Flux

Navigate to:

Joins are a common transformation in any query language, and as part of the effort to make Flux an increasingly valuable tool for our users, the engineers on InfluxData’s query team created, and continue to maintain, two separate join functions. And while these solutions have met some of our users’ needs, they both lack one key feature: support for outer joins.

This has been one of the most frequent requests from our community members, and perhaps one of the most notable drawbacks of using Flux in place of some other query language.

That’s why the query team is happy to announce that, if you were one of the many community members waiting for outer join support in Flux, the wait is over! Flux has a new join package that improves upon the previous two joins both in terms of performance and feature parity with other query languages, including (drum roll…) support for outer joins!

Use the join package in your Flux scripts

Import the join package by adding the following line to the beginning of your script:

import "join"

The join package provides six new functions. I will use the tables() function for my code examples, because it’s the most flexible. The other functions use the same underlying implementation — they just set some default arguments for the sake of user convenience and code readability. So, if you already know what type of join you need for your query, you can save yourself some typing by using one of the following instead:

Define your inputs

First, define your left and right input table streams. These can come from any source, but the key thing to remember is that join will only compare tables with the same group key when considering which rows to join. So, if your two inputs are grouped on different columns, or if their tables don’t share any group keys, you may not get the results you expect.

Pass in your left and right inputs as arguments to the left and right parameters respectively. All of the functions in the join package require that you explicitly set an argument for right. The left parameter, however, can be omitted in favor of pipe-forwarded data. The following two code blocks are functionally identical.

import "join"

tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...

join.tables(
    left: tbl_1,
    right: tbl_2,
    ...
)
import "join"

tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...

tbl_1 |> join.tables(
    right: tbl_2,
    ...
)

The ‘on’ function

Next, we need to define a function for the on parameter. The join transformation uses this function to compare records from the left and right inputs that have the same group key. If the function evaluates to true, the two records being compared will be joined in the final output table.

This function takes two arguments, a left record (1) and a right record (r), and the function body needs to follow a fairly strict format: a single boolean expression, consisting of one or more equality comparisons (==) between a property of 1 and a property of r, chained together by the and operator.

import "join"

tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...

join.tables(
    left: tbl_1,
    right: tbl_2,
    on: (l, r) => l._time == r._time and r.id == l.label
    ...
)

Join methods and the ‘as’ function

There are two arguments left to define, and it’s important to understand how they work together.

The method (as its name implies) determines the method used to join the two tables: inner, left, right, or full.

The as function defines the joined output of two matching rows. Similar to the on function, it takes a left and right record (1 and r, respectively) as its arguments. Its job is to return a new record built from the component properties of 1 and r. The returned record will be included in the final output of the join.

Note that the return value of the as function needs to maintain the group key of the inputs. If you define an output record that modifies or excludes the value of a group column, join returns an error.

The details of how to build the output record vary slightly depending on the join method used. An inner join will skip any unmatched records, allowing us to define a fairly straightforward as function:

import "join"

tbl_1 = from(...) |> ...
tbl_2 = from(...) |> ...

join.tables(
    method: "inner"
    left: tbl_1,
    right: tbl_2,
    on: (l, r) => l._time == r._time and r.id == l.label
    as: (l, r) => ({l with r_val: r._value})
)

If, however, it’s performing any kind of outer join (i.e., left, right, or full), we need to more carefully consider how we construct our output record.

When join finds a record in one input that doesn’t have a matching record in the other input, it will check the join method and, if appropriate, substitute a default record. A default record has the same schema that the missing record would, but only the group key columns are populated. All other values are null.

So, when defining the return value of our as function in an outer join, we need to pay special attention to where we pull its properties from.

A left join will produce one or more output rows for every record in the left input. As a result, the l argument is guaranteed to not be a default record. We do not have the same guarantee for r, so if we build our output record with any non-group-key values from r, we should expect that those will sometimes be null.

The opposite is true for a right join. In a right join, r is guaranteed to not be a default record, while 1 has no such guarantees. So, if we want to ensure that certain values will not be null in the output record, we should pull those values from r.

Full outer joins are a little trickier. In a full outer join, it’s possible for either 1 or r to be a default record. The only guarantee we have is that they will never both be default records at the same time. One of them will have the non-null values we need for our output record, but we can’t be sure which one, so we have to check.

import "array"
import "array"
import "join"

right =
    array.from(
        rows: [
            {_time: 2022-06-01T00:00:00Z, _value: 1, id: "a", key: 1},
            {_time: 2022-06-01T00:00:00Z, _value: 2, id: "b", key: 1},
            {_time: 2022-06-01T00:00:00Z, _value: 3, id: "d", key: 1},
        ],
    )
        |> group(columns: ["key"])
left =
    array.from(
        rows: [
            {_time: 2022-06-01T00:00:00Z, _value: 12.34, label: "a", key: 1},
            {_time: 2022-06-01T00:00:00Z, _value: 56.78, label: "c", key: 1},
            {_time: 2022-06-01T00:00:00Z, _value: 90.12, label: "d", key: 1},
        ],
    )
        |> group(columns: ["key"])

join.tables(
    method: "full",
    left: left,
    right: right,
    on: (l, r) => l._time == r._time and l.label == r.id,
    as: (l, r) => {
        time = if exists l._time then l._time else r._time
        label = if exists l.label then l.label else r.id

        return {
            label: label,
            v_left: l._value,
            key: r.key,
            v_right: r._value,
            _time: time,
        }
    },
)

In this example, we’re using the exists keyword to see if 1 or r has the non-null values we need for the _time and label properties. We are still pulling values directly from 1 and r for v_left and v_right because we expect those to sometimes be null in the final output. We are also pulling the value for the key property directly from r, because it’s part of the group key and therefore guaranteed to be non-null. We could just as easily have used l.key instead.

Running the example script above produces the following output:

script-output

As we would expect, the full outer join produces a joined row for every record in both inputs, even those without a match in the opposite table.

Those are the basics of how to use the new Flux join package. We on the query team encourage you to try it out and give us your feedback!

Keep in mind that, as with any open-source project, the maintenance and development of Flux is an ongoing effort, and the join package is no exception. If you find that a function in the join package does not behave as expected, or doesn’t meet your needs, please let us know by either posting on our community forums or by opening an issue in the Flux repository on GitHub.


1 Joining per group key allows us to, in many cases, significantly reduce the number of row comparisons the join transformation needs to make, and thereby improve performance.

2 You might wonder why the new join package uses a predicate function with such strictly enforced guidelines rather than just a list of column names. The main reason is that, perhaps ironically, this method offers the greatest flexibility. Not only can we compare columns with different names in the left and right tables, but we may be able to leverage this function in the future to allow for more complex join predicates.

For now, enforcing this format allows Flux to ensure that all joins are equi-joins, meaning the only criteria for whether or not two rows should be joined is the equality of two sets of values. As a result, the Flux query planner is able to translate this function into a series of column name pairs, which the join transformation then uses to compare records from the left and right tables, thereby saving on compilation time.

In the future, we may be able to relax the restrictions on the predicate function to allow for more complex comparisons. Doing the same for a version of join that instead used a list of column names as its predicate would be much more difficult.

3 …assuming those values were not null in the input table