Gowtham
Gowtham

Reputation: 87

How to create PCollection<Row> from PCollection<String> for performing beam SQL Trasforms

I am trying to implement a Data Pipeline which joins multiple unbounded sources from Kafka topics. I am able to connect to topic and get the data as PCollection<String> and i need to convert it into PCollection<Row>. I am splitting the comma delimited string to an array and use schema to convert it as Row. But, How to implement/build schema & bind values to it dynamically?

Even if I create a separate class for schema building, is there a way to bind the string array directly to schema?

Below is my current working code which is static and needs to be rewritten every time i build a pipeline and it elongates based on the number of fields as well.

final Schema sch1 =
                Schema.builder().addStringField("name").addInt32Field("age").build();

PCollection<KafkaRecord<Long, String>> kafkaDataIn1 = pipeline
  .apply(
    KafkaIO.<Long, String>read()
      .withBootstrapServers("localhost:9092")
      .withTopic("testin1")
      .withKeyDeserializer(LongDeserializer.class)
      .withValueDeserializer(StringDeserializer.class)
      .updateConsumerProperties(
         ImmutableMap.of("group.id", (Object)"test1")));

PCollection<Row> Input1 = kafkaDataIn1.apply(
  ParDo.of(new DoFn<KafkaRecord<Long, String>, Row>() {
    @ProcessElement
    public void processElement(
        ProcessContext processContext,
        final OutputReceiver<Row> emitter) {

          KafkaRecord<Long, String> record = processContext.element();
          final String input = record.getKV().getValue();

          final String[] parts = input.split(",");

          emitter.output(
            Row.withSchema(sch1)
               .addValues(
                   parts[0],
                   Integer.parseInt(parts[1])).build());
        }}))
  .apply("window",
     Window.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
       .triggering(AfterWatermark.pastEndOfWindow())
       .withAllowedLateness(Duration.ZERO)
       .accumulatingFiredPanes());

Input1.setRowSchema(sch1);

My Expectation is to achieve the same thing as above code dynamically/reusable way.

Upvotes: 1

Views: 1830

Answers (1)

Romain Manni-Bucau
Romain Manni-Bucau

Reputation: 3422

The schema is set on a pcollection so it is not dynamic, if you want to build it lazily, then you need to use a format/coder supporting it. Java serialization or json are examples.

That said to benefit from sql feature you can also use a static schema with querying fields and other fields, this way the static part enables to do you sql and you dont loose additionnal data.

Romain

Upvotes: 1

Related Questions