Aditya
Aditya

Reputation: 13

org.apache.kafka.connect.errors.DataException: Struct schemas do not match

Trying to create and populate an array schema of type string inside an array schema of particular object type using kafka.connect.data.Schema in java. But getting error

org.apache.kafka.connect.errors.DataException: Struct schemas do not match. at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:245) at org.apache.kafka.connect.data.Struct.put(Struct.java:215) at org.apache.kafka.connect.data.Struct.put(Struct.java:204) at com.sintec.ra.fixture.BookProgramFixture.createAiringStruct(BookProgramFixture.java:60) at com.sintec.ra.handlers.BookProgramTopicHandlerTest.mockAiringStruct(BookProgramTopicHandlerTest.java:132) at com.sintec.ra.handlers.BookProgramTopicHandlerTest.init(BookProgramTopicHandlerTest.java:107) at com.sintec.ra.handlers.BookProgramTopicHandlerTest.(BookProgramTopicHandlerTest.java:76) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.junit.runners.BlockJUnit4ClassRunner.createTest(BlockJUnit4ClassRunner.java:217) at org.junit.runners.BlockJUnit4ClassRunner$1.runReflectiveCall(BlockJUnit4ClassRunner.java:266) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.BlockJUnit4ClassRunner.methodBlock(BlockJUnit4ClassRunner.java:263) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:538) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:760) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:460) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:206) Eg

"airingTo" : [ {
      "dow" : [ "SATURDAY" ]
      }]

public static Struct createAiringStruct() 
{
    Schema valueSchema = createValueSchemaForProgram();
    Struct valueStruct = new Struct(valueSchema).put("airingTo", 
    getAiringList());
    return valueStruct;
}

private static Schema createValueSchemaForProgram() 
{
    return 
    SchemaBuilder.struct().name("PROGRAM").field("airingTo", 
    SchemaBuilder.array(createAiringListSchema())).build();
}

private static Schema createAiringListSchema() 
{
   return SchemaBuilder.struct().name("airingTo").field("dow", 
   SchemaBuilder.array(Schema.STRING_SCHEMA)).build();
}

private static List<Struct> getAiringList() 
{
    Struct valueStruct = new 
    Struct(createAiringListSchema()).put("dow", 
    Arrays.asList("SATURDAY"));
    List<Struct> dayPartList = new ArrayList<>();
    dayPartList.add(valueStruct);
    return dayPartList;
}

Upvotes: 1

Views: 2366

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191671

Not sure where you got .name("PROGRAM"), but this seems to work for me.

It seems you were mixing struct names with fields.

    Schema dowSchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
    Schema airingToItemSchema = SchemaBuilder.struct()
            .field("dow", dowSchema)
            .build();
    Schema airingToSchema = SchemaBuilder.array(airingToItemSchema).build();
    Schema rootSchema = SchemaBuilder.struct()
            .field("airingTo", airingToSchema);
            .build();

    Struct item = new Struct(airingToItemSchema)
            .put("dow", Collections.singletonList("SATURDAY"));
    Struct rootStruct = new Struct(rootSchema)
            .put("airingTo", Collections.singletonList(item));

    System.out.println(rootStruct);

Outputs

Struct{airingTo=[Struct{dow=[SATURDAY]}]}

Upvotes: 1

Related Questions