Chitral Verma
Chitral Verma

Reputation: 2853

Creating complex BigQuery Schema in Google DataFlow (java)

I have an unbounded stream of complex objects that I want to load into BigQuery. The structure of these objects represents the schema of my destination table in BigQuery.

The problem is that since there are a lot of nested fields in the POJO, its an extremely tedious task to convert it to a TableSchema object and I'm looking for a quick/ automated way to convert my POJO to TableSchema object while writing to BigQuery.

I'm not very familiar with Apache Beam API, and any help will be appreciated.

Upvotes: 0

Views: 1310

Answers (3)

Juan Urrego
Juan Urrego

Reputation: 353

In my company I created kind of a ORM (we called OBQM) to do this. We are expecting to release it to the public. The code is quite big (specially because I created annotations and so on) but I can share with you some snippets for a quick schema generation:

public TableSchema generateTableSchema(@Nonnull final Class cls) {

        final TableSchema tableSchema = new TableSchema();
        tableSchema.setFields(generateFieldsSchema(cls));

        return tableSchema;
    }

public List<TableFieldSchema> generateFieldsSchema(@Nonnull final Class cls) {

        final List<TableFieldSchema> schemaFields = new ArrayList<>();
        final Field[] clsFields = cls.getFields();

        for (final Field field : clsFields) {
            schemaFields.add(fromFieldToSchemaField(field));
        }

        return schemaFields;
    }

This code takes all the fields from the POJO class and creates a TableSchema object (the one that BigQueryIO uses in ApacheBeam). You can see a method that I created called fromFieldToSchemaField. This method identifies each field type and setup the field name, mode, description and type. In this case to keep it simple I'm going to focus on the type and name:

public static TableFieldSchema fromFieldToSchemaField(@Nonnull final Field field) {
        return fromFieldToSchemaField(field, 0);
}

public static TableFieldSchema fromFieldToSchemaField(
            @Nonnull final Field field,
            final int iteration) {

        final TableFieldSchema schemaField = new TableFieldSchema();
        final Type customType = field.getGenericType().getTypeName()

        schemaField.setName(field.getName());
        schemaField.setMode("NULLABLE"); // You can add better logic here, we use annotations to override this value
        schemaField.setType(getFieldTypeString(field));
        schemaField.setDescription("Optional"); // Optional

        if (iteration < MAX_RECURSION
                && (isStruct(schemaField.getType())
                        || isRecord(schemaField.getType()))) {
            final List<TableFieldSchema> schemaFields = new ArrayList<>();
            final Field[] fields = getFieldsFromComplexObjectField(field);

            for (final Field subField : fields) {
                schemaFields.add(
                        fromFieldToSchemaField(
                                subField, iteration + 1));
            }

            schemaField.setFields(schemaFields.isEmpty() ? null : schemaFields);
        }

        return schemaField;
    }

And now the method that returns the BigQuery field type.

public static String getFieldTypeString(@Nonnull final Field field) {
   // On my side this code is much complex but this is a short version of that
   final Class<?> cls = (Class<?>) field.getGenericType()
   if (cls.isAssignableFrom(String.class)) {
            return "STRING";
        } else if (cls.isAssignableFrom(Integer.class) || cls.isAssignableFrom(Short.class)) {
            return "INT64";
        } else if (cls.isAssignableFrom(Double.class)) {
            return "NUMERIC";
        } else if (cls.isAssignableFrom(Float.class)) {
            return "FLOAT64";
        } else if (cls.isAssignableFrom(Boolean.class)) {
            return "BOOLEAN";
        } else if (cls.isAssignableFrom(Double.class)) {
            return "BYTES";
        } else if (cls.isAssignableFrom(Date.class)
                || cls.isAssignableFrom(DateTime.class)) {
            return "TIMESTAMP";
        } else {
            return "STRUCT";
        }
    }

Keep in mind that I'm not showing how to identify primitive types or arrays. But this is a good start for your code :). Please let me know if you need any help.

Upvotes: 1

guillaume blaquiere
guillaume blaquiere

Reputation: 76010

In a pipeline, I load a list of schema from GCS. I keep them in string format because the TableSchema is not serializable. However, I load them to TableSchema for validate them. Then I add them in string format to a map in the Option object.

String schema = new String(blob.getContent());
// Decorate list of fields for allowing a correct parsing
String targetSchema = "{\"fields\":" + schema + "}";
try {
    //Preload schema to ensure validity, but then use string version
    Transport.getJsonFactory().fromString(targetSchema, TableSchema.class);

    String tableName = blob.getName().replace(SCHEMA_FILE_PREFIX, "").replace(SCHEMA_FILE_SUFFIX, "");
    tableSchemaStringMap.put(tableName, targetSchema);
} catch (IOException e) {
    logger.warn("impossible to read schema " + blob.getName() + " in bucket gs://" + options.getSchemaBucket());
}

I didn't find another solution when I developed this.

Upvotes: 1

Reza Rokni
Reza Rokni

Reputation: 1256

If your using JSON for the message serialization in PubSub you can make use of one of the provided templates:

PubSub To BigQuery Template

The code for that template is here:

PubSubToBigQuery.java

Upvotes: 0

Related Questions