Nikhil Suthar
Nikhil Suthar

Reputation: 2431

How to handle NULL Value in BigQuery while writing through Dataflow?

I am ingesting data from one Database to BigQuery using the JdbcIO Source connector and BigQueryIO Sink connector provided by Apache Beam.

Below is my sample table data:

enter image description here

As we can see few columns such as id, and booking_date contain NULL Value. So when I try to write data into BigQuery, it gives the below error

"message": "Error while reading data, error message: JSON parsing error in row starting at position 0: Only optional fields can be set to NULL. Field: status; Value: NULL 

if I pass null in booking_date it gives an invalid date format error.

Below is the RowMapper I am using to convert JdbcIO resultset into TableRow. it is the same code that GCP JdbcToBigQuery Template is using.

public TableRow mapRow(ResultSet resultSet) throws Exception {
  ResultSetMetaData metaData = resultSet.getMetaData();
  TableRow outputTableRow = new TableRow();
  for (int i = 1; i <= metaData.getColumnCount(); i++) {
    if (resultSet.getObject(i) == null) {
      outputTableRow.set(getColumnRef(metaData, i), resultSet.getObject(i));
    // outputTableRow.set(getColumnRef(metaData, i), String.valueOf(resultSet.getObject(i)));
      continue;
    }

/*
 * DATE:      EPOCH MILLISECONDS -> yyyy-MM-dd
 * DATETIME:  EPOCH MILLISECONDS -> yyyy-MM-dd hh:mm:ss.SSSSSS
 * TIMESTAMP: EPOCH MILLISECONDS -> yyyy-MM-dd hh:mm:ss.SSSSSSXXX
 *
 * MySQL drivers have ColumnTypeName in all caps and postgres in small case
 */
switch (metaData.getColumnTypeName(i).toLowerCase()) {
  case "date":
    outputTableRow.set(
        getColumnRef(metaData, i), dateFormatter.format(resultSet.getDate(i)));
    break;
  case "datetime":
    outputTableRow.set(
        getColumnRef(metaData, i),
        datetimeFormatter.format((TemporalAccessor) resultSet.getObject(i)));
    break;
  case "timestamp":
    outputTableRow.set(
        getColumnRef(metaData, i), timestampFormatter.format(resultSet.getTimestamp(i)));
    break;
  case "clob":
    Clob clobObject = resultSet.getClob(i);
    if (clobObject.length() > Integer.MAX_VALUE) {
      LOG.warn(
          "The Clob value size {} in column {} exceeds 2GB and will be truncated.",
          clobObject.length(),
          getColumnRef(metaData, i));
    }
    outputTableRow.set(
        getColumnRef(metaData, i), clobObject.getSubString(1, (int) clobObject.length()));
    break;
  default:
        outputTableRow.set(getColumnRef(metaData, i), resultSet.getObject(i).toString());
    }
  }

  return outputTableRow;
}

Click here for more details JdbcToBigQuery

Solution I tried but did not get success

How can I handle all Null case? Please note, I will not be able to ignore these rows since few columns contain values.

Upvotes: 1

Views: 621

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

To solve your issue, you have to pass null if the date value is null and you have to set the associated BigQuery columns to NULLABLE :

public TableRow mapRow(ResultSet resultSet) throws Exception {
  ResultSetMetaData metaData = resultSet.getMetaData();
  TableRow outputTableRow = new TableRow();
  for (int i = 1; i <= metaData.getColumnCount(); i++) {
    if (resultSet.getObject(i) == null) {
      outputTableRow.set(getColumnRef(metaData, i), resultSet.getObject(i));
    // outputTableRow.set(getColumnRef(metaData, i), String.valueOf(resultSet.getObject(i)));
      continue;
    }

/*
 * DATE:      EPOCH MILLISECONDS -> yyyy-MM-dd
 * DATETIME:  EPOCH MILLISECONDS -> yyyy-MM-dd hh:mm:ss.SSSSSS
 * TIMESTAMP: EPOCH MILLISECONDS -> yyyy-MM-dd hh:mm:ss.SSSSSSXXX
 *
 * MySQL drivers have ColumnTypeName in all caps and postgres in small case
 */
public void yourMethod() {
    switch (metaData.getColumnTypeName(i).toLowerCase()) {
        case "date":
            String date = Optional.ofNullable(resultSet.getDate(i))
                    .map(d -> dateFormatter.format(d))
                    .orElse(null);
            
            outputTableRow.set(getColumnRef(metaData, i), date);
            break;
        case "datetime":
            String datetime = Optional.ofNullable(resultSet.getObject(i))
                    .map(d -> datetimeFormatter.format((TemporalAccessor) d))
                    .orElse(null);
            
            outputTableRow.set(getColumnRef(metaData, i), datetime);
            break;
        case "timestamp":
            String timestamp = Optional.ofNullable(resultSet.getTimestamp(i))
                    .map(t -> timestampFormatter.format(t))
                    .orElse(null);
            
            outputTableRow.set(getColumnRef(metaData, i), timestamp);
            break;
        case "clob":
            Clob clobObject = resultSet.getClob(i);
            if (clobObject.length() > Integer.MAX_VALUE) {
                LOG.warn(
                        "The Clob value size {} in column {} exceeds 2GB and will be truncated.",
                        clobObject.length(),
                        getColumnRef(metaData, i));
            }
            outputTableRow.set(
                    getColumnRef(metaData, i), clobObject.getSubString(1, (int) clobObject.length()));
            break;
        default:
            outputTableRow.set(getColumnRef(metaData, i), resultSet.getObject(i).toString());
    }
    return outputTableRow;
} 

For the date, datetime and timestamp blocs, I applied the transformation only if the value is not null, otherwise I retrieved default null value.

Upvotes: 2

Related Questions