whisperbye
whisperbye

Reputation: 75

Apache Pulsar schema validate with json string

In my case, I have some raw JSON string data send to the topic and can't hard code POJO class, I want to use the pulsar schema feature to validate the structure. I have a topic "my-topic" and associated with JSON schema below, then I try to transmission some message.

var producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES();
producer.send("{\"y\": 1}".getBytes()); // here! the value is 1(number) not string.

var reader = client.newReader(Schema.AUTO_CONSUME())
var message = reader.readNext();
I got {"y": 1}

my question is how pulsar schema works? The message should be rejected.

{
  "version": 1,
  "schemaInfo": {
    "name": "my-topic",
    "schema": {
      "type": "record",
      "name": "Data",
      "namespace": "com.iot.test",
      "fields": [
        {
          "name": "y",
          "type": [
            "null",
            "string"
          ]
        }
      ]
    },
    "type": "JSON",
    "properties": {
      "__alwaysAllowNull": "true"
    }
  }
}

Upvotes: 1

Views: 3120

Answers (2)

whisperbye
whisperbye

Reputation: 75

my fault. just need to set

v2.5.0
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable iot/test

v2.4.2
bin/pulsar-admin namespaces set-schema-autoupdate-strategy --disable iot/test

Upvotes: 1

David Kjerrumgaard
David Kjerrumgaard

Reputation: 1076

The Schema.AUTO_PRODUCE_BYTES setting is useful for transferring data from a producer to a Pulsar topic that has a schema because it ensures that the sent message is compatible with the topic's schema. However, I don't see where you specified the schema for the topic.

A topic is assigned a schema automatically when you connect a typed producer or consumer, e.g.

Producer producer = client.newProducer(JSONSchema.of(SensorReading.class))
    .topic("sensor-data")
    .sendTimeout(3, TimeUnit.SECONDS)
    .create();

But you have stated that you cannot do this because you "can't hard code POJO". Therefore your only other option to assign a schema to the topic (so it can enforce message schema compatibility) is to use the REST API calls for manual schema management.

Based on your schema, your schema-definition file would look something like the following:

{
  "type": "JSON",
  "schema": "{\"type\":\"record\",\"name\":\"Data\",\"namespace\":\"com.iot.test\",\"fields\":[{\"name\":\"y\",\"type\":[\"null\",\"string\"],\"default\":null}}",
  "properties": {}
}

HTH

Upvotes: 0

Related Questions