Reputation: 302
I have dto CryptoNews. Which contains
List<Currencies> currencies
I would like to save "currencies" field to SourceRecord when constructing it. Can't figure out how to:
My attempts end in this exception: Invalid Java object for schema type STRUCT: class com.dto.Currencies
Kafka Connect doesn't provide explicit example how to do handle case, when object in List requires it's own Schema. I also tried to apply similar approach as in Kafka test cases, but it doesn't work. https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98
kafka-connect-api version: 0.10.2.0-cp1
value and key converter: org.apache.kafka.connect.json.JsonConverter
no avro used
CryptoNews implements Serializable {
// omitted fields
private List<Currencies> currencies;
}
class Currencies {
private String code;
private String title;
private String slug;
private String url;
}
SchemaConfiguration
public static final Integer FIRST_VERSION = 1;
public static final String CURRENCIES_SCHEMA_NAME = "currencies";
public static final Schema NEWS_SCHEMA = SchemaBuilder.struct().name("News")
.version(FIRST_VERSION)
.field(CURRENCIES_SCHEMA_NAME, CURRENCIES_SCHEMA)
// simple fields ommited for brevity.
.build();
public static final Schema CURRENCIES_SCHEMA = SchemaBuilder.array(
SchemaBuilder.struct()
.field(CODE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
.optional()
.build()
)
.optional()
.name(CURRENCIES_SCHEMA_NAME)
.version(FIRST_VERSION)
.build();
SourceTask
return new SourceRecord(
sourcePartition(),
sourceOffset(cryptoNews),
config.getString(TOPIC_CONFIG),
null,
CryptoNewsSchema.NEWS_KEY_SCHEMA,
buildRecordKey(cryptoNews),
CryptoNewsSchema.NEWS_SCHEMA,
buildRecordValue(cryptoNews),
Instant.now().toEpochMilli()
);
public Struct buildRecordValue(CryptoNews cryptoNews){
Struct valueStruct = new Struct(CryptoNewsSchema.NEWS_SCHEMA);
// Produces Invalid Java object for schema type STRUCT: class com.dto.Currencies
List<Currencies> currencies = cryptoNews.getCurrencies();
if (currencies != null) {
valueStruct.put(CurrenciesSchema.CURRENCIES_SCHEMA_NAME, currencies);
}
return valueStruct;
}
UPDATE:
worker.properties
bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true
rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000
Upvotes: 0
Views: 1151
Reputation: 191738
You need to provide a List<Struct>
Here's a full unit test example
First, an interface that will help
public interface ConnectPOJOConverter<T> {
Schema getSchema();
T fromConnectData(Struct s);
Struct toConnectData(T t);
}
class ArrayStructTest {
public static final Schema CURRENCY_ITEM_SCHEMA = SchemaBuilder.struct()
.version(1)
.name(Currency.class.getName())
.doc("A currency item")
.field("code", Schema.OPTIONAL_STRING_SCHEMA)
.field("title", Schema.OPTIONAL_STRING_SCHEMA)
.field("slug", Schema.OPTIONAL_STRING_SCHEMA)
.field("url", Schema.OPTIONAL_STRING_SCHEMA)
.build();
static final ConnectPOJOConverter<Currency> CONVERTER = new CurrencyConverter();
@Test
void myTest() {
// Given
List<Currency> currencies = new ArrayList<>();
// TODO: Get from external source
currencies.add(new Currency("200", "Hello", "/slug", "http://localhost"));
currencies.add(new Currency("200", "World", "/slug", "http://localhost"));
// When: build Connect Struct data
Schema valueSchema = SchemaBuilder.struct()
.name("CryptoNews")
.doc("A record holding a list of currency items")
.version(1)
.field("currencies", SchemaBuilder.array(CURRENCY_ITEM_SCHEMA).required().build())
.build();
final List<Struct> items = currencies.stream()
.map(CONVERTER::toConnectData)
.collect(Collectors.toList());
// In the SourceTask, this is what goes into the SourceRecord along with the valueSchema
Struct value = new Struct(valueSchema);
value.put("currencies", items);
// Then
assertDoesNotThrow(value::validate);
Object itemsFromStruct = value.get("currencies");
assertInstanceOf(List.class, itemsFromStruct);
//noinspection unchecked
List<Object> data = (List<Object>) itemsFromStruct; // could also use List<Struct>
assertEquals(2, data.size(), "same size");
assertInstanceOf(Struct.class, data.get(0), "Object list still has type information");
Struct firstStruct = (Struct) data.get(0);
assertEquals("Hello", firstStruct.get("title"));
currencies = data.stream()
.map(o -> (Struct) o)
.map(CONVERTER::fromConnectData)
.filter(Objects::nonNull) // in case converter has errors, could return null
.collect(Collectors.toList());
assertTrue(currencies.size() <= data.size());
assertEquals("World", currencies.get(1).getTitle(), "struct parsing data worked");
}
static class CurrencyConverter implements ConnectPOJOConverter<Currency> {
@Override
public Schema getSchema() {
return CURRENCY_ITEM_SCHEMA;
}
@Override
public Currency fromConnectData(Struct s) {
// simple conversion, but more complex types could throw errors
return new Currency(
s.getString("code"),
s.getString("title"),
s.getString("url"),
s.getString("slug")
);
}
@Override
public Struct toConnectData(Currency c) {
Struct s = new Struct(getSchema());
s.put("code", c.getCode());
s.put("title", c.getTitle());
s.put("url", c.getUrl());
s.put("slug", c.getSlug());
return s;
}
}
}
The alternative approach is to just use a String schema, and use Jackson ObjectMapper
to get a JSON string, then let JSONConverter
handle the rest.
final ObjectMapper om = new ObjectMapper();
final Schema valueSchema = Schema.STRING_SCHEMA;
output.put("schema", new TextNode("TODO")); // replace with JSONConverter schema
// for-each currency
Map<String, JsonNode> output = new HashMap<>();
try {
output.put("payload", om.readTree(om.writeValueAsBytes(currency))); // write and parse to not double-encode
String value = om.writeValueAsString(output);
SourceRecord r = new SourceRecord(...., valueSchema, value);
records.add(r); // poll return result
} catch (IOException e) {
// TODO: handle
}
// end for-each
return records;
Upvotes: 1