Noam Manyfler
Noam Manyfler

Reputation: 61

Apache Beam SQL JOIN with Side Input

Can I SQL JOIN a PCollection and a PCollectionView (via side input) in Apache Beam SQL?

From Apache Beam SQL docs, the way to implement JOIN in the query is to create a PCollectionTuple that accepts only PCollections.

// Create a PCollectionTuple containing both PCollections.
// TupleTags IDs will be used as table names in the SQL query
PCollectionTuple namesAndFoods = PCollectionTuple
    .of(new TupleTag<>("Apps"), appsRows) // appsRows from the previous example
    .and(new TupleTag<>("Reviews"), reviewsRows);

// Compute the total number of reviews
// and average rating per app
// by joining two PCollections
PCollection<Row> output = namesAndFoods.apply(
    SqlTransform.query(
        "SELECT Apps.appId, COUNT(Reviews.rating), AVG(Reviews.rating) "
            + "FROM Apps INNER JOIN Reviews ON Apps.appId = Reviews.appId "
            + "GROUP BY Apps.appId"));

In my pipeline, I have a side input that reads some data from BigQuery table. The side input updates every X hours.

I tried creating PCollectionTuple like this:

PCollectionTuple rowsAndSideInput = PCollectionTuple
        .of(new TupleTag<>("PCOLLECTION"), mainCollection)
        .and(new TupleTag<>("MY_SIDE_INPUT"), collectionView.getPCollection());

But .getPCollection() say it's an internal function and is not available in all contexts

Do you know if it's possible to use Beam SQL JOIN with side input?

Upvotes: 1

Views: 538

Answers (1)

robertwb
robertwb

Reputation: 5104

PCollectionViews are just views of PCollections intended only for use as side inputs. However, this view is being created somewhere, so you should be able to use whatever PCollection was used to create this view rather than trying to use the view itself.

Upvotes: 1

Related Questions