Nagesh Singh Chauhan
Nagesh Singh Chauhan

Reputation: 784

Data is written to BigQuery but not in proper format

I'm writing data to BigQuery and successfully gets written there. But I'm concerned with the format in which it is getting written.

Below is the format in which the data is shown when I execute any query in BigQuery :

BigQuery output

Check the first row, the value of SalesComponent is CPS_H but its showing 'BeamRecord [dataValues=[CPS_H' and In the ModelIteration the value is ended with a square braket.

Below is the code that is used to push data to BigQuery from BeamSql:

TableSchema tableSchema = new TableSchema().setFields(ImmutableList.of(
    new TableFieldSchema().setName("SalesComponent").setType("STRING").setMode("REQUIRED"),
    new TableFieldSchema().setName("DuetoValue").setType("STRING").setMode("REQUIRED"),
    new TableFieldSchema().setName("ModelIteration").setType("STRING").setMode("REQUIRED")
));

TableReference tableSpec = BigQueryHelpers.parseTableSpec("beta-194409:data_id1.tables_test");
System.out.println("Start Bigquery");
final_out.apply(MapElements.into(TypeDescriptor.of(TableRow.class)).via(
    (MyOutputClass elem) -> new TableRow().set("SalesComponent", elem.SalesComponent).set("DuetoValue", elem.DuetoValue).set("ModelIteration", elem.ModelIteration)))
        .apply(BigQueryIO.writeTableRows()
        .to(tableSpec)
        .withSchema(tableSchema)
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

p.run().waitUntilFinish();

EDIT

I have transformed BeamRecord into MyOutputClass type using below code and this also doesn't work:

 PCollection<MyOutputClass> final_out = join_query.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() {
        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) {
             BeamRecord record = c.element();
               String[] strArr = record.toString().split(",");
            MyOutputClass moc = new MyOutputClass();
            moc.setSalesComponent(strArr[0]);
            moc.setDuetoValue(strArr[1]);
            moc.setModelIteration(strArr[2]);
            c.output(moc);
        }
    }));

Upvotes: 0

Views: 162

Answers (2)

Nagesh Singh Chauhan
Nagesh Singh Chauhan

Reputation: 784

I was able to resolve this issue using below methods :

 PCollection<MyOutputClass> final_out = record40.apply(ParDo.of(new DoFn<BeamRecord, MyOutputClass>() {
        private static final long serialVersionUID = 1L;
        @ProcessElement
        public void processElement(ProcessContext c) throws ParseException {
             BeamRecord record = c.element();
               String strArr = record.toString();
               String strArr1 = strArr.substring(24);
               String xyz = strArr1.replace("]","");
               String[] strArr2 = xyz.split(",");

Upvotes: 0

Anton
Anton

Reputation: 2539

It looks like your MyOutputClass is constructed incorrectly (with incorrect values). If you look at it, BigQueryIO is able to create rows with correct fields just fine. But those fields have wrong values. Which means that when you call .set("SalesComponent", elem.SalesComponent) you already have incorrect data in the elem.

My guess is the problem is in some previous step, when you convert from BeamRecord to MyOutputClass. You would get a result similar to what you're seeing if you did something like this (or some other conversion logic did this for you behind the scenes):

  • convert BeamRecord to string by calling beamRecord.toString();
    • if you look at BeamRecord.toString() implementation you can see that you're getting exactly that string format;
  • split this string by , getting an array of strings;
  • construct MyOutputClass from that array;

Pseudocode for this is something like:

PCollection<MyOutputClass> final_out = 
  beamRecords
    .apply(
      ParDo.of(new DoFn() {

        @ProcessElement
        void processElement(Context c) {
           BeamRecord record = c.elem();
           String[] fields = record.toString().split(",");
           MyOutputClass elem = new MyOutputClass();
           elem.SalesComponent = fields[0];
           elem.DuetoValue = fields[1];
           ...
           c.output(elem);
        }
      })
    );

Correct way of doing something like this is to call getters on the record instead of splitting its string representation, along these lines (pseudocode):

PCollection<MyOutputClass> final_out = 
      beamRecords
        .apply(
          ParDo.of(new DoFn() {

            @ProcessElement
            void processElement(Context c) {
               BeamRecord record = c.elem();
               MyOutputClass elem = new MyOutputClass();

               //get field value by name
               elem.SalesComponent = record.getString("CPS_H..."); 

               // get another field value by name
               elem.DuetoValue = record.getInteger("...");
               ...
               c.output(elem);
            }
          })
        );

You can verify something like this by adding a simple ParDo where you either put a breakpoint and look at the elements in the debugger, or output the elements somewhere else (e.g. console).

Upvotes: 2

Related Questions