Reputation: 2853
I have an unbounded stream of complex objects that I want to load into BigQuery. The structure of these objects represents the schema of my destination table in BigQuery.
The problem is that since there are a lot of nested fields in the POJO, its an extremely tedious task to convert it to a TableSchema
object and I'm looking for a quick/ automated way to convert my POJO to TableSchema
object while writing to BigQuery.
I'm not very familiar with Apache Beam API, and any help will be appreciated.
Upvotes: 0
Views: 1310
Reputation: 353
In my company I created kind of a ORM (we called OBQM) to do this. We are expecting to release it to the public. The code is quite big (specially because I created annotations and so on) but I can share with you some snippets for a quick schema generation:
public TableSchema generateTableSchema(@Nonnull final Class cls) {
final TableSchema tableSchema = new TableSchema();
tableSchema.setFields(generateFieldsSchema(cls));
return tableSchema;
}
public List<TableFieldSchema> generateFieldsSchema(@Nonnull final Class cls) {
final List<TableFieldSchema> schemaFields = new ArrayList<>();
final Field[] clsFields = cls.getFields();
for (final Field field : clsFields) {
schemaFields.add(fromFieldToSchemaField(field));
}
return schemaFields;
}
This code takes all the fields from the POJO class and creates a TableSchema
object (the one that BigQueryIO uses in ApacheBeam). You can see a method that I created called fromFieldToSchemaField
. This method identifies each field type and setup the field name, mode, description and type. In this case to keep it simple I'm going to focus on the type and name:
public static TableFieldSchema fromFieldToSchemaField(@Nonnull final Field field) {
return fromFieldToSchemaField(field, 0);
}
public static TableFieldSchema fromFieldToSchemaField(
@Nonnull final Field field,
final int iteration) {
final TableFieldSchema schemaField = new TableFieldSchema();
final Type customType = field.getGenericType().getTypeName()
schemaField.setName(field.getName());
schemaField.setMode("NULLABLE"); // You can add better logic here, we use annotations to override this value
schemaField.setType(getFieldTypeString(field));
schemaField.setDescription("Optional"); // Optional
if (iteration < MAX_RECURSION
&& (isStruct(schemaField.getType())
|| isRecord(schemaField.getType()))) {
final List<TableFieldSchema> schemaFields = new ArrayList<>();
final Field[] fields = getFieldsFromComplexObjectField(field);
for (final Field subField : fields) {
schemaFields.add(
fromFieldToSchemaField(
subField, iteration + 1));
}
schemaField.setFields(schemaFields.isEmpty() ? null : schemaFields);
}
return schemaField;
}
And now the method that returns the BigQuery field type.
public static String getFieldTypeString(@Nonnull final Field field) {
// On my side this code is much complex but this is a short version of that
final Class<?> cls = (Class<?>) field.getGenericType()
if (cls.isAssignableFrom(String.class)) {
return "STRING";
} else if (cls.isAssignableFrom(Integer.class) || cls.isAssignableFrom(Short.class)) {
return "INT64";
} else if (cls.isAssignableFrom(Double.class)) {
return "NUMERIC";
} else if (cls.isAssignableFrom(Float.class)) {
return "FLOAT64";
} else if (cls.isAssignableFrom(Boolean.class)) {
return "BOOLEAN";
} else if (cls.isAssignableFrom(Double.class)) {
return "BYTES";
} else if (cls.isAssignableFrom(Date.class)
|| cls.isAssignableFrom(DateTime.class)) {
return "TIMESTAMP";
} else {
return "STRUCT";
}
}
Keep in mind that I'm not showing how to identify primitive types or arrays. But this is a good start for your code :). Please let me know if you need any help.
Upvotes: 1
Reputation: 76010
In a pipeline, I load a list of schema from GCS. I keep them in string format because the TableSchema is not serializable. However, I load them to TableSchema for validate them. Then I add them in string format to a map in the Option object.
String schema = new String(blob.getContent());
// Decorate list of fields for allowing a correct parsing
String targetSchema = "{\"fields\":" + schema + "}";
try {
//Preload schema to ensure validity, but then use string version
Transport.getJsonFactory().fromString(targetSchema, TableSchema.class);
String tableName = blob.getName().replace(SCHEMA_FILE_PREFIX, "").replace(SCHEMA_FILE_SUFFIX, "");
tableSchemaStringMap.put(tableName, targetSchema);
} catch (IOException e) {
logger.warn("impossible to read schema " + blob.getName() + " in bucket gs://" + options.getSchemaBucket());
}
I didn't find another solution when I developed this.
Upvotes: 1
Reputation: 1256
If your using JSON for the message serialization in PubSub you can make use of one of the provided templates:
The code for that template is here:
Upvotes: 0