Reputation: 712
I have a spark-streaming application that reads messages from a pubsub topic (e.g. kafka), applies some transformations to each of them, and saves them as a parquet file in GCS, partitioned by an arbitrary column. It's relatively easy to do it using structured streaming and spark-gcs connector. For example, each message looks like this:
{
"app_id": "app1",
"user_id": "u001",
"evt_timestamp": 1617105047,
"evt_data": { ... }
}
I read it as a structured-streaming DataFrame, then partition it by e.g. app_id
and user_id
, and then save it to a GCS bucket, which then looks something like this:
gs://my-bucket/data/app_id=app1/user_id=u001/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u002/XXX.part
gs://my-bucket/data/app_id=app1/user_id=u003/XXX.part
gs://my-bucket/data/app_id=app2/user_id=u001/XXX.part
...
I'd like to move my data processing to GCP, so that I don't have to manage my Spark infrastructure. I could just rewrite my application to use DStreams and run it on Dataproc, but important people are reluctant about using Spark. I haven't been able to find a way to partition my data. BigQuery supports clustering, which seems to be what I need, but I still need to continuously save it to GCS. Can it be easily done in GCP, or is my use case somehow broken?
As suggested by the accepted answer, I managed to achieve this using writeDynamic
and my implementation of FileIO.Write.FileNaming
.
It roughly looks like this:
PCollection<String> pubsubMessages = ... // read json string messages from pubsub
PCollection<ParsedMessage> messages = pubsubMessages
.apply(ParDo.of(new ParseMessage())) // convert json pubsub message to a java bean
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(2))));
FileIO.Write<Partition, JsonMessage> writer = FileIO.<Partition, JsonMessage>writeDynamic()
.by(jsonMessage -> new Partition(/* some jsonMessage fields */))
.via(
Contextful.fn(JsonMessage::toRecord), // convert message to Sink type, in this case GenericRecord
ParquetIO.sink(OUT_SCHEMA)) // create a parquet sink
.withNaming(part -> new PartitionFileName(/* file name based on `part` fields */))
.withDestinationCoder(AvroCoder.of(Partition.class, Partition.SCHEMA))
.withNumShards(1)
.to("output");
PartitionFileName can look like this
class PartFileName implements FileIO.Write.FileNaming {
private final String[] partNames;
private final Serializable[] partValues;
public PartFileName(String[] partNames, Serializable[] partValues) {
this.partNames = partNames;
this.partValues = partValues;
}
@Override
public String getFilename(
BoundedWindow window,
PaneInfo pane,
int numShards,
int shardIndex,
Compression compression) {
StringBuilder dir = new StringBuilder();
for (int i = 0; i < this.partNames.length; i++) {
dir
.append(partNames[i])
.append("=")
.append(partValues[i])
.append("/");
}
String fileName = String.format("%d_%d_%d.part", shardIndex, numShards, window.maxTimestamp().getMillis());
return String.format("%s/%s", dir.toString(), fileName);
}
}
This results in directory structure like
output/date=20200301/app_id=1001/0_1_1617727449999.part
Upvotes: 3
Views: 1254
Reputation: 1428
I believe you are looking for Pubsub with Apache Beam/Google Cloud Dataflow streaming pipelines.
Yes, it can do what you want without a lot of effort. You can define windows on your streaming, and use Parquet IO to write it to GCS.
Although not Parquet, this example reads from Pubsub and writes text files to GCS.
To reach dynamic file name functionality, FileIO's writeDynamic
with your own FilenamePolicy
should work well.
Upvotes: 2