Eric Lee
Eric Lee

Reputation: 710

Is there a way to match avro schema with Bigquery and Bigtable?

I'd like to import bigquery data to bigtable using Google Composer.

Exporting bigquery rows in Avro format to GCS was successful. However, import Avro data to Bigtable was not.

The error says

Caused by: org.apache.avro.AvroTypeException: Found Root, expecting com.google.cloud.teleport.bigtable.BigtableRow, missing required field key

I guess the schema between bigquery and bigtable should match each other. But I have no idea how to do this.

Upvotes: 1

Views: 1153

Answers (2)

mitbal
mitbal

Reputation: 355

For those of you who still have problem like me because they are not familiar with avro, here is one working schema transformation that I found after some tinkering.

For example, if you have table from bigquery like this example table from bigquery

And you want to use user_id as the bigtable row_key and ingest all columns, here is the example code to encode them as avro file.

from avro.schema import Parse
from avro.io import DatumWriter
from avro.datafile import DataFileWriter

bigtable_schema = {
    "name" : "BigtableRow",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "key", "type" : "bytes"},
      { "name" : "cells",
        "type" : {
          "type" : "array",
          "items": {
            "name": "BigtableCell",
            "type": "record",
            "fields": [
              { "name" : "family", "type" : "string"},
              { "name" : "qualifier", "type" : "bytes"},
              { "name" : "timestamp", "type" : "long", "logicalType" : "timestamp-micros"},
              { "name" : "value", "type" : "bytes"}
            ]
          }
        }
      }
   ]
}

parsed_schema = Parse(json.dumps(bigtable_schema))

row_key = 'user_id'
family_name = 'feature_name'
feature_list = ['channel', 'zip_code', 'history']

with open('features.avro', 'wb') as f:

    writer = DataFileWriter(f, DatumWriter(), parsed_schema)

    for item in df.iterrows():

        row = item[1]
        ts =  int(datetime.now().timestamp()) * 1000 * 1000

        for feat in feature_list:

            writer.append({
                "key": row[row_key].encode('utf-8'),
                "cells": [{"family": family_name, 
                           "qualifier": feat.encode('utf-8'), 
                           "timestamp": ts, 
                           "value": str(row[feat]).encode('utf-8')}]
            })
        
    writer.close()

Then you can use dataflow template job to run the ingestion.

Complete code can be found here: https://github.com/mitbal/sidu/blob/master/bigquery_to_bigtable.ipynb

Upvotes: 1

Lakshmi
Lakshmi

Reputation: 195

For every record read from the Avro files:

  • Attributes present in the files and in the table are loaded into the table.
  • Attributes present in the file but not in the table are subject to ignore_unknown_fields,
  • Attributes that exist in the table but not in the file will use their default value, if there is one set.

The below links are helpful.

[1] https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#cloud-storage-avro-to-bigtable [2] https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/resources/schema/avro/bigtable.avsc

[3] Avro to BigTable - Schema issue?

Upvotes: 2

Related Questions