arunK
arunK

Reputation: 418

DateTime datatype in BigQuery

I have a partitioned table where one of the column is of type DateTime and the table is partitioned on same column. According to spark-bigquery documentation, the corresponding Spark SQL type is of String type. https://github.com/GoogleCloudDataproc/spark-bigquery-connector

I tried doing the same but I am getting datatype mismatch issue.

Code Snippet:

ZonedDateTime nowPST = ZonedDateTime.ofInstant(Instant.now(), TimeZone.getTimeZone("PST").toZoneId());
        df = df.withColumn("createdDate", lit(nowPST.toLocalDateTime().toString()));

Error:

Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Failed to load to <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME> in job JobId{project=<PROJECT_ID>, job=<JOB_ID>, location=US}. BigQuery error was Provided Schema does not match Table <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. Field createdDate has changed type from DATETIME to STRING
        at com.google.cloud.spark.bigquery.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.scala:156)
        at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:89)
        ... 36 more

Upvotes: 2

Views: 3050

Answers (3)

Carlos Gomez
Carlos Gomez

Reputation: 230

After a lot of tests i can write datetime field into BigQuery from Dataproc Serverless. Data was reading from Oracle with JDBC.

the combination of parts, to get write data in Bigquery was:

  1. Use orc as intemediate format, to do this, add this config to spark session:

    spark = SparkSession.builder \
    .appName("Oracle JDBC to BigQuery via GCS") \
    .config("spark.jars", "gs://test-dataproc/jdbcdriver/ojdbc6.jar") \
    .config("spark.datasource.bigquery.intermediateFormat", "orc") \
    .getOrCreate()
    
  2. Use customSechema to read some fields like and use fetch size to improve performance at read:

    df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "(SELECT * FROM table1 WHERE 1=1) alias") \
    .option("user", "username") \
    .option("password", "password") \
    .option("driver", 'oracle.jdbc.OracleDriver') \
    .option("batchsize", '30000') \
    .option("fetchsize", "30000") \
    .option("customSchema", 'FIELD1 string') \
    .load() 
    
  3. Transform the field with the follow statement (get attention of your date format in your dataframe https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html). In this example data was "yyyy-MM-dd HH:mm:ss.S" and was traslate to date :

    from pyspark.sql import functions as f
    df_with_timestamp2 = df_with_timestamp.withColumn("date_trip", f.to_date(f.to_timestamp(col("date_trip"), "yyyy-MM-dd HH:mm:ss.S")))
    
  4. Example to datetime

     df_with_timestamp = df.withColumn("date_delivery", to_timestamp(col("date_delivery"), "yyyy-MM-dd HH:mm:ss"))
    
  5. Finally write to BigQuery

     df_with_timestamp.write \
     .format("bigquery") \
     .option("table", "dataset.target_Table") \
     .option("temporaryGcsBucket", "bucket_temp_1/taget_table_folder") \
     .mode("append") \
     .save()
    

Upvotes: 0

Jnb2387
Jnb2387

Reputation: 19

I am running into this issue now as well with both geography https://community.databricks.com/s/question/0D58Y000099mPyDSAU/does-databricks-support-writing-geographygeometry-data-into-bigquery

And for Datetime types. The only way I could get the table from databricks to BigQuery (without creating a temporary table and Inserting the data as this would still be costly due to the size of the table) was to write the table out to a CSV into a GCS Bucket

results_df.write.format("csv").mode("overwrite").save("gs://<bucket-name>/ancillary_test")

And then load the data from the bucket to the table in BigQuery specifying the schema

LOAD DATA INTO <dataset>.<tablename>(
    PRICENODEID INTEGER,
    ISONAME STRING,
    PRICENODENAME STRING,
    MARKETTYPE STRING,
    GMTDATETIME TIMESTAMP,
    TIMEZONE STRING,
    LOCALDATETIME DATETIME,
    ANCILLARY STRING,
    PRICE FLOAT64,
    CHANGE_DATE TIMESTAMP
)
FROM FILES (
  format = 'CSV',
  uris = ['gs://<bucket-name>/ancillary_test/*.csv']
    );

Upvotes: -1

David Rabinowitz
David Rabinowitz

Reputation: 30448

As Spark has no support for DateTime, the BigQuery connector does not support writing DateTime - there is no equivalent Spark data type that can be used. We are exploring ways to augment the DataFrame's metadata in order to support the types which are supported by BigQuery and not by Spark (DateTime, Time, Geography).

At the moment please have this field as String, and have the conversion on the BigQuery side.

Upvotes: 2

Related Questions