Reputation: 19
I have created a dataflow job in GCS which reads .tsv files from GCS Bucket and inserts into BigQuery Tables. Before forming a table row, I have to read a lookup file which has the column names. This lookup file is inside .tar.gz directory along with other files but I need to read only the lookup files with the name column_headers.tsv
I have used a FileIO.match().filepattern() to read a list of file names from the .tar.gz directory but dataflow in GCS is throwing error stating FileNotFoundException
Below is the stack trace,
severity: "ERROR"
textPayload: "Error message from worker: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.io.FileNotFoundException: fs:/xxx_xxxx_xxxxx/initial_load/xxxxxxxx_2022-10-11-lookup_data.tar.gz (No such file or directory)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:187)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:117)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.io.FileNotFoundException: gs:/crs_user_events/initial_load/napaonlineglobal_2022-10-11-lookup_data.tar.gz (No such file or directory)
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
gradle_inital_load.ReadTarDir$1$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
org.apache.beam.sdk.io.FileIO$ReadMatches$ToReadableFileFn.process(FileIO.java:874)
org.apache.beam.sdk.io.FileIO$ReadMatches$ToReadableFileFn$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
... 21 more
Caused by: java.io.FileNotFoundException: fs:/xxx_xxxx_xxxx/initial_load/xxxxxx_2022-10-11-lookup_data.tar.gz (No such file or directory)
java.base/java.io.FileInputStream.open0(Native Method)
java.base/java.io.FileInputStream.open(FileInputStream.java:216)
xxxx_inital_load.ReadFilexxxx.processElement(ReadFilexxxx.java:97)
"
timestamp: "2022-12-13T17:31:06.310507157Z"
}
My pipeline code is as below,
package xxxx_inital_load;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.DirectRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.io.Files;
import org.apache.beam.sdk.annotations.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
public class ReadFilexxxx {
private static final Logger LOG = LoggerFactory.getLogger(ReadFilexxxx.class);
static String outputTableProject = "xxx-x-xxxx";
static String outputTableDataset = "xxxx_events";
public static void main(String[] args) {
// TODO Auto-generated method stub
DataflowPipelineOptions dfOptions = PipelineOptionsFactory.as(MyOptions.class);
dfOptions.setProject("xxx-x-xxxx");
dfOptions.setStagingLocation("fs://xxxx_events/xxxx");
dfOptions.setRegion("us-eastx");
dfOptions.setTempLocation("fs://xxxx_events/xxxx");
dfOptions.setServiceAccount("[email protected]");
dfOptions.setSubnetwork(
"https://www.googleapis.com/compute/v1/projects/xxx-xxx-x-xxxx-xxx/regions/us-eastx/subnetworks/x-xxxxxx-xxxx-xxxx");
dfOptions.setUsePublicIps(false);
dfOptions.setRunner(DataflowRunner.class);
DataflowRunner.fromOptions(dfOptions);
Pipeline p = Pipeline.create(dfOptions);
// PCollection<String[]> fileContents =
PCollectionView<String[]> filecontents = (PCollectionView<String[]>) p
.apply("MatchFile(s)", FileIO.match().filepattern(
"fs://xxxx_events/initial_load/*.tar.gz"))
.apply("GCS File Match", FileIO.readMatches().withCompression(Compression.AUTO))
.apply("Untar and read files", ParDo.of(new DoFn<FileIO.ReadableFile, String[]>() {
//.apply("Untar and read files", ParDo.of(new DoFn<MatchResult.Metadata, String[]>() {
@ProcessElement
public void processElement(ProcessContext c) throws FileNotFoundException, IOException {
String line = null;
String tmp = null;
String[] Data = null;// = new ArrayList<>();
String filename = c.element().getMetadata().resourceId().getFilename();
filename = "fs://xxxxx_events/initial_load/".concat(filename);
LOG.info("columns file path:", filename);
TarArchiveInputStream tarInput = new TarArchiveInputStream(
new GzipCompressorInputStream(new FileInputStream(filename)));
TarArchiveEntry currentEntry = tarInput.getNextTarEntry();
BufferedReader br = null;
while (currentEntry != null) {
if (currentEntry.getName().contains("column_headers.tsv")) {
br = new BufferedReader(new InputStreamReader(tarInput)); // Read directly from tarInput
System.out.println("For File = " + currentEntry.getName());
while ((tmp = br.readLine()) != null) {
line = tmp;
Data = line.split("\t");
System.out.println("line=" + line);
}
// br.close();
}
currentEntry = tarInput.getNextTarEntry(); // You forgot to iterate to the next file
}
br.close();
tarInput.close();
c.output(Data);
}
})).apply(View.asSingleton());
PCollection<String> lines = p.apply("Read Files",
TextIO.read().from("fs://xxxx_events/initial_load/*.tsv.gz")
.withCompression(Compression.GZIP));
p.getCoderRegistry().registerCoderForClass(ReadTarDir.class, TableAndRowCoder.of());
PCollection<TableAndRow> tablerows = lines
.apply("Transform File lines into TableAndRow", ParDo.of(new DoFn<String, TableAndRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
int tabnam_idx, indx;
TableAndRow tbObj;
String tabName = null;
TableRow row = new TableRow();
String[] columns = c.sideInput(filecontents);
String[] arr = c.element().split("\t");
if (arr.length > 0) {
tabnam_idx = getIndex(columns, "channel");
indx = getIndex(columns, "page_event");
// ProductDetails
if ((arr[tabnam_idx].toString()).contains("productdetails")) {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("detail_page_view_events_idl");
// tabName = String.format("%s:%s.%s", outputTableProject,
// outputTableDataset,"Detail_Page_View_Events");
row.set("eventType", "detail-page-view");
int index = getIndex(columns, "evar6");
if (arr[getIndex(columns, "evar6")] != "") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("eventTime", arr[getIndex(columns, "date_time")]);
row.set("experimentIds", arr[getIndex(columns, "evar104")]);
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
row.set("userInfo.userId", "1");
row.set("userInfo.ipAddress", arr[getIndex(columns, "ip")]);
row.set("userInfo.userAgent", arr[getIndex(columns, "user_agent")]);
row.set("userInfo.directUserRequest", "1");
row.set("uri", arr[getIndex(columns, "page_url")]);
if (arr[getIndex(columns, "visit_referrer")] == "") {
row.set("referrerUri", "1");
} else {
row.set("referrerUri", arr[getIndex(columns, "visit_referrer")]);
}
}
// Homepage
if ((arr[tabnam_idx].toString()).contains("homepage1")) {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("home_page_view_events_idl");
// tabName = String.format("%s:%s.%s", outputTableProject,
// outputTableDataset,"Home_Page_View_Events");
row.set("eventType", "home-page-view");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
}
// Search
indx = getIndex(columns, "page_event");
if ((arr[tabnam_idx].toString()).contains("search") && arr[indx] == "0") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("search_events_idl");
// tabName = String.format("%s:%s.%s", outputTableProject,
// outputTableDataset,"Pass Table Name here");
/* create row here */
row.set("eventType", "search");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("searchQuery", arr[getIndex(columns, "evar1")]);
} else {
row.set("searchQuery", arr[getIndex(columns, "evar2")]);
}
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
}
// Browse
indx = getIndex(columns, "page_event");
if ((arr[tabnam_idx].toString()).contains("category-landing") && arr[indx] == "0") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("category_page_view_events_idl");
/* create row here */
row.set("eventType", "category-page-view");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("pageCategories", arr[getIndex(columns, "evar104")]);
}
// add-to-cart
if (arr[getIndex(columns, "product_list")] != null && arr[indx] == "12") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("add_to_cart_events_idl");
/* create row here */
row.set("eventType", "add-to-cart");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
}
// purchase complete
indx = getIndex(columns, "page_event");
if (arr[getIndex(columns, "product_list")] != null && arr[indx] == "1") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("purchase_complete_events_idl");
/* create row here */
row.set("eventType", "home-page-view");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
row.set("productDetails.product.quantity", arr[getIndex(columns, "product_list")]);
row.set("purchaseTransaction.revenue", arr[getIndex(columns, "product_list")]);
row.set("purchaseTransaction.currencyCode", arr[getIndex(columns, "product_list")]);
}
}
LOG.info("Row:" + row.toString());
tbObj = new TableAndRow(row, tabName);
c.output(tbObj);
}
}).withSideInputs(filecontents)).setCoder(TableAndRowCoder.of());
tablerows.apply("Write to BigQuery",
BigQueryIO.<TableAndRow>write().to(line -> getTableName(line))
.withFormatFunction((TableAndRow line) -> convertToTableRow(line))
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
System.out.println("Pipeline Executed");
}
private static TableRow convertToTableRow(TableAndRow line) {
// TODO Auto-generated method stub
TableRow row = line.getRow();
return row;
}
public static int getIndex(String[] Data, String str) {
int index = -1;
for (int j = 0; j < Data.length; j++) {
if (Data[j].contains(str)) {
index = j;
break;
}
}
return index;
}
public static TableDestination getTableName(ValueInSingleWindow<TableAndRow> line) {
TableDestination destination;
TableAndRow row = line.getValue();
destination = new TableDestination(row.getTab_name(), null);
return destination;
}
}
Upvotes: 1
Views: 860
Reputation: 1357
I believe the issue here is that you're sending a GCS path into Java's FileInputStream
which isn't aware of GCS and doesn't know what to do with gs://
paths. Your element
in this case is a Beam ReadableFile. The methods of that class such as open
and readFullyAsBytes
are aware of GCS and can make appropriate calls under the hood.
Your line above for creating the TarArchiveInputStream
would look something like the following:
import java.nio.channels.Channels;
TarArchiveInputStream tarInput =
new TarArchiveInputStream(
new GzipCompressorInputStream(
Channels.newInputStream(c.element.open())));
Alternatively, you can see an example of using Beam constructs to read a file based on a string path (which could be a GCS path) in https://github.com/mozilla/gcp-ingestion/blob/main/ingestion-beam/src/main/java/com/mozilla/telemetry/util/BeamFileInputStream.java
Upvotes: 1