DasHnPat
DasHnPat

Reputation: 41

Cloud Pub/Sub to GCS, write per element (Dataflow Pipeline)

How can you write to GCS every-time you receive a message from Pubsub, it does windowed writes but not per element writes. Any tip on this matter is much appreciated thanks.

Example Link (https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToText.java)

upon running this sample code it writes pub-sub messages sent to GCS. But when the duration set is 1 min it saves all the messages and then writes to 1 file after a min, but I want it to write each message to a different file.

Upvotes: 4

Views: 1368

Answers (3)

chintan sureliya
chintan sureliya

Reputation: 53

I had implemented the same using processElement.

Below is the sample code.

Pipeline Step:

pipeline_object.apply("Manually write events to GCS", ParDo.of(new Write_to_GCS()));

ProcessElement function:

@SuppressWarnings("serial")
static class Write_to_GCS extends DoFn<KV<String, String>, TextIO.Write> {
    @ProcessElement
    public void processElement(ProcessContext c) throws JSONException {

        // Fetch text you need to write into file
        String output_string = c.element().getValue();

        // Create your service object
        Storage storage = StorageOptions.getDefaultInstance().getService();

        // Upload a blob to the newly created bucket
        BlobId blobId = BlobId.of(GCS_BUCKET_NAME, STORAGE_FILE_PATH);
        BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
        @SuppressWarnings("unused")
        Blob blob = storage.create(blobInfo, event_string.getBytes(UTF_8));
    }
}

You will need to include below dependency in pom.xml

<dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-storage</artifactId>
    <version>1.37.1</version>
</dependency>

What this code will do is it will create a gcs storage service object and write a blob into specified path.

Upvotes: 0

Rub&#233;n C.
Rub&#233;n C.

Reputation: 1098

You can create a Google Cloud Function to automatically do it. Cloud functions can be triggered by 4 different events. One of them is Pub/Sub message publishing. If you want to test an example, refer to this Pub/Sub tutorial.

You should write your code to correctly redirect each message to the desired GCS, for example based on the Pub/Sub topic.

Upvotes: 0

Oleksandr Bushkovskyi
Oleksandr Bushkovskyi

Reputation: 825

If you need a file per message, one option would be to create simple transform like that:

package com.myapp.dataflow.transform;

import org.apache.beam.sdk.transforms.DoFn;
import com.google.cloud.storage.*;
import static java.nio.charset.StandardCharsets.UTF_8;

public class StringToGcsFile extends DoFn<String, Blob> {
    private Storage storage;
    private String bucketName = "my-bucket";

    @Setup
    public void setup() {
        storage = StorageOptions.getDefaultInstance().getService();
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // consider some strategy for object names, UUID or something
        String blobName = "my_blob_name";

        // Upload a blob to the bucket
        BlobId blobId = BlobId.of(bucketName, blobName);
        BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
        Blob blob = storage.create(blobInfo, c.element().getBytes(UTF_8));

        c.output(blob);
    }
}

Maven dependency:

<dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-storage</artifactId>
    <version>1.35.0</version>
</dependency>

Upvotes: 2

Related Questions