Reputation: 784
I'm writing data to BigQuery and successfully gets written there. But I'm concerned with the format in which it is getting written.
Below is the format in which the data is shown when I execute any query in BigQuery :
Check the first row, the value of SalesComponent is CPS_H but its showing 'BeamRecord [dataValues=[CPS_H' and In the ModelIteration the value is ended with a square braket.
Below is the code that is used to push data to BigQuery from BeamSql:
TableSchema tableSchema = new TableSchema().setFields(ImmutableList.of(
new TableFieldSchema().setName("SalesComponent").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("DuetoValue").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("ModelIteration").setType("STRING").setMode("REQUIRED")
));
TableReference tableSpec = BigQueryHelpers.parseTableSpec("beta-194409:data_id1.tables_test");
System.out.println("Start Bigquery");
final_out.apply(MapElements.into(TypeDescriptor.of(TableRow.class)).via(
(MyOutputClass elem) -> new TableRow().set("SalesComponent", elem.SalesComponent).set("DuetoValue", elem.DuetoValue).set("ModelIteration", elem.ModelIteration)))
.apply(BigQueryIO.writeTableRows()
.to(tableSpec)
.withSchema(tableSchema)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();
EDIT
I have transformed BeamRecord into MyOutputClass type using below code and this also doesn't work:
PCollection<MyOutputClass> final_out = join_query.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
BeamRecord record = c.element();
String[] strArr = record.toString().split(",");
MyOutputClass moc = new MyOutputClass();
moc.setSalesComponent(strArr[0]);
moc.setDuetoValue(strArr[1]);
moc.setModelIteration(strArr[2]);
c.output(moc);
}
}));
Upvotes: 0
Views: 162
Reputation: 784
I was able to resolve this issue using below methods :
PCollection<MyOutputClass> final_out = record40.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws ParseException {
BeamRecord record = c.element();
String strArr = record.toString();
String strArr1 = strArr.substring(24);
String xyz = strArr1.replace("]","");
String[] strArr2 = xyz.split(",");
Upvotes: 0
Reputation: 2539
It looks like your MyOutputClass
is constructed incorrectly (with incorrect values). If you look at it, BigQueryIO
is able to create rows with correct fields just fine. But those fields have wrong values. Which means that when you call .set("SalesComponent", elem.SalesComponent)
you already have incorrect data in the elem
.
My guess is the problem is in some previous step, when you convert from BeamRecord
to MyOutputClass
. You would get a result similar to what you're seeing if you did something like this (or some other conversion logic did this for you behind the scenes):
BeamRecord
to string by calling beamRecord.toString()
;
BeamRecord.toString()
implementation you can see that you're getting exactly that string format;,
getting an array of strings;MyOutputClass
from that array;Pseudocode for this is something like:
PCollection<MyOutputClass> final_out =
beamRecords
.apply(
ParDo.of(new DoFn() {
@ProcessElement
void processElement(Context c) {
BeamRecord record = c.elem();
String[] fields = record.toString().split(",");
MyOutputClass elem = new MyOutputClass();
elem.SalesComponent = fields[0];
elem.DuetoValue = fields[1];
...
c.output(elem);
}
})
);
Correct way of doing something like this is to call getters on the record instead of splitting its string representation, along these lines (pseudocode):
PCollection<MyOutputClass> final_out =
beamRecords
.apply(
ParDo.of(new DoFn() {
@ProcessElement
void processElement(Context c) {
BeamRecord record = c.elem();
MyOutputClass elem = new MyOutputClass();
//get field value by name
elem.SalesComponent = record.getString("CPS_H...");
// get another field value by name
elem.DuetoValue = record.getInteger("...");
...
c.output(elem);
}
})
);
You can verify something like this by adding a simple ParDo
where you either put a breakpoint and look at the elements in the debugger, or output the elements somewhere else (e.g. console).
Upvotes: 2