Reputation: 41
How can you write to GCS every-time you receive a message from Pubsub, it does windowed writes but not per element writes. Any tip on this matter is much appreciated thanks.
upon running this sample code it writes pub-sub messages sent to GCS. But when the duration set is 1 min it saves all the messages and then writes to 1 file after a min, but I want it to write each message to a different file.
Upvotes: 4
Views: 1368
Reputation: 53
I had implemented the same using processElement.
Below is the sample code.
Pipeline Step:
pipeline_object.apply("Manually write events to GCS", ParDo.of(new Write_to_GCS()));
ProcessElement function:
@SuppressWarnings("serial")
static class Write_to_GCS extends DoFn<KV<String, String>, TextIO.Write> {
@ProcessElement
public void processElement(ProcessContext c) throws JSONException {
// Fetch text you need to write into file
String output_string = c.element().getValue();
// Create your service object
Storage storage = StorageOptions.getDefaultInstance().getService();
// Upload a blob to the newly created bucket
BlobId blobId = BlobId.of(GCS_BUCKET_NAME, STORAGE_FILE_PATH);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
@SuppressWarnings("unused")
Blob blob = storage.create(blobInfo, event_string.getBytes(UTF_8));
}
}
You will need to include below dependency in pom.xml
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.37.1</version>
</dependency>
What this code will do is it will create a gcs storage service object and write a blob into specified path.
Upvotes: 0
Reputation: 1098
You can create a Google Cloud Function to automatically do it. Cloud functions can be triggered by 4 different events. One of them is Pub/Sub message publishing. If you want to test an example, refer to this Pub/Sub tutorial.
You should write your code to correctly redirect each message to the desired GCS, for example based on the Pub/Sub topic.
Upvotes: 0
Reputation: 825
If you need a file per message, one option would be to create simple transform like that:
package com.myapp.dataflow.transform;
import org.apache.beam.sdk.transforms.DoFn;
import com.google.cloud.storage.*;
import static java.nio.charset.StandardCharsets.UTF_8;
public class StringToGcsFile extends DoFn<String, Blob> {
private Storage storage;
private String bucketName = "my-bucket";
@Setup
public void setup() {
storage = StorageOptions.getDefaultInstance().getService();
}
@ProcessElement
public void processElement(ProcessContext c) {
// consider some strategy for object names, UUID or something
String blobName = "my_blob_name";
// Upload a blob to the bucket
BlobId blobId = BlobId.of(bucketName, blobName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
Blob blob = storage.create(blobInfo, c.element().getBytes(UTF_8));
c.output(blob);
}
}
Maven dependency:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.35.0</version>
</dependency>
Upvotes: 2