Reputation: 4472
I am trying to write an Array of Structs field from my Dataflow pipeline to big query, the schema of the table generated is correct but no data gets populated in the fields.
My DoFn function:
public class ProcessIpBlocks {
public static class IpBlocksToIp extends DoFn<TableRow, TableRow> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) throws JSONException {
TableRow row = c.element();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar cal = Calendar.getInstance();
long startIp = 0L, endIp = 0L;
if(row.get("start_ip") != null)
startIp = Long.parseLong((String)row.get("start_ip"));
if(row.get("end_ip") != null)
endIp = Long.parseLong((String)row.get("end_ip"));
for(long i= startIp; i<=endIp; i++)
{
TableRow outputRow = new TableRow();
outputRow.set("start_ip", startIp);
outputRow.set("ip", i);
if(row.get("postal_code") != null && !((String)row.get("postal_code")).isEmpty()){
System.out.println("This is getting written to logs");
endIp = Long.parseLong((String)row.get("end_ip"));
JSONArray atrArray = new JSONArray();
JSONObject atr = new JSONObject();
atr.put("id", "zippostal_code");
JSONArray atrValueArray = new JSONArray();
atr.put("value", atrValueArray.put((String)row.get("postal_code")));
atr.put("pr", 0.5);
atr.put("dt", cal.getTime());
atrArray.put(atr);
outputRow.set("atr", atrArray);
}
c.output(outputRow);
}
}
}
}
My pipeline write step:
iPBlocksToIPData.apply("Foo", ParDo.of(new ProcessIpBlocks.IpBlocksToIp()))
.apply(BigQueryIO.Write
.named("WriteIPs")
.to(String.format("%1$s:%2$s.%3$s",projectId, eventDataset, ipBlocksToIpTable))
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
Upvotes: 1
Views: 1743
Reputation: 4472
Below solution worked, using TableRow
instead of JSONArray
public class Foo {
public static class Foo extends DoFn<TableRow, TableRow> {
@Override
public void processElement(ProcessContext c) throws JSONException {
TableRow row = c.element();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar cal = Calendar.getInstance();
long startIp = 0L, endIp = 0L;
if(row.get("start_ip") != null)
startIp = Long.parseLong((String)row.get("start_ip"));
if(row.get("end_ip") != null)
endIp = Long.parseLong((String)row.get("end_ip"));
for(long i= startIp; i<=endIp; i++)
{
TableRow outputRow = new TableRow();
outputRow.set("start_ip", startIp);
outputRow.set("ip", i);
if(row.get("postal_code") != null && !((String)row.get("postal_code")).isEmpty()){
endIp = Long.parseLong((String)row.get("end_ip"));
TableRow atrRow = new TableRow();
atrRow.set("id", "zippostal_code");
atrRow.set("value", new String[] {(String)row.get("postal_code")});
outputRow.set("atr", atrRow);
}
System.out.println(outputRow);
c.output(outputRow);
}
}
}
Upvotes: 5