Reputation: 75
In my case, I have some raw JSON string data send to the topic and can't hard code POJO class, I want to use the pulsar schema feature to validate the structure. I have a topic "my-topic" and associated with JSON schema below, then I try to transmission some message.
var producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES();
producer.send("{\"y\": 1}".getBytes()); // here! the value is 1(number) not string.
var reader = client.newReader(Schema.AUTO_CONSUME())
var message = reader.readNext();
I got {"y": 1}
my question is how pulsar schema works? The message should be rejected.
{
"version": 1,
"schemaInfo": {
"name": "my-topic",
"schema": {
"type": "record",
"name": "Data",
"namespace": "com.iot.test",
"fields": [
{
"name": "y",
"type": [
"null",
"string"
]
}
]
},
"type": "JSON",
"properties": {
"__alwaysAllowNull": "true"
}
}
}
Upvotes: 1
Views: 3120
Reputation: 75
my fault. just need to set
v2.5.0
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable iot/test
v2.4.2
bin/pulsar-admin namespaces set-schema-autoupdate-strategy --disable iot/test
Upvotes: 1
Reputation: 1076
The Schema.AUTO_PRODUCE_BYTES
setting is useful for transferring data from a producer to a Pulsar topic that has a schema because it ensures that the sent message is compatible with the topic's schema. However, I don't see where you specified the schema for the topic.
A topic is assigned a schema automatically when you connect a typed producer or consumer, e.g.
Producer producer = client.newProducer(JSONSchema.of(SensorReading.class))
.topic("sensor-data")
.sendTimeout(3, TimeUnit.SECONDS)
.create();
But you have stated that you cannot do this because you "can't hard code POJO". Therefore your only other option to assign a schema to the topic (so it can enforce message schema compatibility) is to use the REST API calls for manual schema management.
Based on your schema, your schema-definition file would look something like the following:
{
"type": "JSON",
"schema": "{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"com.iot.test\",\"fields\":[{\"name\":\"y\",\"type\":[\"null\",\"string\"],\"default\":null}}",
"properties": {}
}
HTH
Upvotes: 0