leonaugust
leonaugust

Reputation: 302

Kafka Connect. How to handle List of custom object, when specifying schema and building SourceRecord value

I have dto CryptoNews. Which contains

List<Currencies> currencies

I would like to save "currencies" field to SourceRecord when constructing it. Can't figure out how to:

  1. Declare it in schema.
  2. Pass it to Struct object when building value.

My attempts end in this exception: Invalid Java object for schema type STRUCT: class com.dto.Currencies

Kafka Connect doesn't provide explicit example how to do handle case, when object in List requires it's own Schema. I also tried to apply similar approach as in Kafka test cases, but it doesn't work. https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98

How to do this?

kafka-connect-api version: 0.10.2.0-cp1
value and key converter: org.apache.kafka.connect.json.JsonConverter
no avro used


CryptoNews implements Serializable {

   // omitted fields

    private List<Currencies> currencies;

}

class Currencies {

    private String code;
    private String title;
    private String slug;
    private String url;

}

SchemaConfiguration

public static final Integer FIRST_VERSION = 1;
public static final String CURRENCIES_SCHEMA_NAME = "currencies";
 
 
 
public static final Schema NEWS_SCHEMA = SchemaBuilder.struct().name("News")
            .version(FIRST_VERSION)
            .field(CURRENCIES_SCHEMA_NAME, CURRENCIES_SCHEMA)
            // simple fields ommited for brevity.
            .build();
 
    
 
public static final Schema CURRENCIES_SCHEMA = SchemaBuilder.array(
            SchemaBuilder.struct()
            .field(CODE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(TITLE_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(SLUG_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .field(URL_FIELD, Schema.OPTIONAL_STRING_SCHEMA)
            .optional()
            .build()
    )
            .optional()
            .name(CURRENCIES_SCHEMA_NAME)
            .version(FIRST_VERSION)
            .build();

SourceTask

return new SourceRecord(
                sourcePartition(),
                sourceOffset(cryptoNews),
                config.getString(TOPIC_CONFIG),
                null,
                CryptoNewsSchema.NEWS_KEY_SCHEMA,
                buildRecordKey(cryptoNews),
                CryptoNewsSchema.NEWS_SCHEMA,
                buildRecordValue(cryptoNews),
                Instant.now().toEpochMilli()
        );

 
 
public Struct buildRecordValue(CryptoNews cryptoNews){
        Struct valueStruct = new Struct(CryptoNewsSchema.NEWS_SCHEMA);
 
        // Produces Invalid Java object for schema type STRUCT: class com.dto.Currencies
        List<Currencies> currencies = cryptoNews.getCurrencies();
        if (currencies != null) {
            valueStruct.put(CurrenciesSchema.CURRENCIES_SCHEMA_NAME, currencies);
        }
 
        return valueStruct;
}

UPDATE:

worker.properties

bootstrap.servers=localhost:29092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=true

rest.port=8086
rest.host.name=127.0.0.1
offset.storage.file.filename=offsets/standalone.offsets
offset.flush.interval.ms=10000

Upvotes: 0

Views: 1151

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191738

You need to provide a List<Struct>

Here's a full unit test example

First, an interface that will help

public interface ConnectPOJOConverter<T> {
  Schema getSchema();
  T fromConnectData(Struct s);
  Struct toConnectData(T t);
}

class ArrayStructTest {

  public static final Schema CURRENCY_ITEM_SCHEMA = SchemaBuilder.struct()
      .version(1)
      .name(Currency.class.getName())
      .doc("A currency item")
      .field("code", Schema.OPTIONAL_STRING_SCHEMA)
      .field("title", Schema.OPTIONAL_STRING_SCHEMA)
      .field("slug", Schema.OPTIONAL_STRING_SCHEMA)
      .field("url", Schema.OPTIONAL_STRING_SCHEMA)
      .build();

  static final ConnectPOJOConverter<Currency> CONVERTER = new CurrencyConverter();

  @Test
  void myTest() {
    // Given
    List<Currency> currencies = new ArrayList<>();
    // TODO: Get from external source
    currencies.add(new Currency("200", "Hello", "/slug", "http://localhost"));
    currencies.add(new Currency("200", "World", "/slug", "http://localhost"));

    // When: build Connect Struct data
    Schema valueSchema = SchemaBuilder.struct()
        .name("CryptoNews")
        .doc("A record holding a list of currency items")
        .version(1)
        .field("currencies", SchemaBuilder.array(CURRENCY_ITEM_SCHEMA).required().build())
        .build();
    final List<Struct> items = currencies.stream()
        .map(CONVERTER::toConnectData)
        .collect(Collectors.toList());
    // In the SourceTask, this is what goes into the SourceRecord along with the valueSchema
    Struct value = new Struct(valueSchema);
    value.put("currencies", items);

    // Then
    assertDoesNotThrow(value::validate);
    Object itemsFromStruct = value.get("currencies");
    assertInstanceOf(List.class, itemsFromStruct);
    //noinspection unchecked
    List<Object> data = (List<Object>) itemsFromStruct; // could also use List<Struct>
    assertEquals(2, data.size(), "same size");
    assertInstanceOf(Struct.class, data.get(0), "Object list still has type information");
    Struct firstStruct = (Struct) data.get(0);
    assertEquals("Hello", firstStruct.get("title"));
    currencies = data.stream()
        .map(o -> (Struct) o)
        .map(CONVERTER::fromConnectData)
        .filter(Objects::nonNull)  // in case converter has errors, could return null
        .collect(Collectors.toList());
    assertTrue(currencies.size() <= data.size());
    assertEquals("World", currencies.get(1).getTitle(), "struct parsing data worked");
  }

  static class CurrencyConverter implements ConnectPOJOConverter<Currency> {

    @Override
    public Schema getSchema() {
      return CURRENCY_ITEM_SCHEMA;
    }

    @Override
    public Currency fromConnectData(Struct s) {
      // simple conversion, but more complex types could throw errors
      return new Currency(
          s.getString("code"),
          s.getString("title"),
          s.getString("url"),
          s.getString("slug")
      );
    }

    @Override
    public Struct toConnectData(Currency c) {
      Struct s = new Struct(getSchema());
      s.put("code", c.getCode());
      s.put("title", c.getTitle());
      s.put("url", c.getUrl());
      s.put("slug", c.getSlug());
      return s;
    }
  }

}

The alternative approach is to just use a String schema, and use Jackson ObjectMapper to get a JSON string, then let JSONConverter handle the rest.

final ObjectMapper om = new ObjectMapper();
final Schema valueSchema = Schema.STRING_SCHEMA;
output.put("schema", new TextNode("TODO")); // replace with JSONConverter schema

// for-each currency
Map<String, JsonNode> output = new HashMap<>();
try {
  output.put("payload", om.readTree(om.writeValueAsBytes(currency))); // write and parse to not double-encode
  
  String value = om.writeValueAsString(output);
  SourceRecord r = new SourceRecord(...., valueSchema, value);
  records.add(r); // poll return result
} catch (IOException e) {
  // TODO: handle
}
// end for-each

return records;

Upvotes: 1

Related Questions