balderman
balderman

Reputation: 23825

Flink Job design - working with hybrid Kafka topic

I have a kafka topic that contains few events types.(This is given)
The events are JSON documents.

Lets call the event types: A,B,C,D,E.

I can tell the type by using a field I have in each and every event.

I want to have a Flink job that will handle events A & B separately (using a session window), C & D should go to another type of window and event D should be dropped.

Can I implement such a design in Flink?

Thanks

Upvotes: 0

Views: 58

Answers (2)

David Anderson
David Anderson

Reputation: 43697

If you'd rather use the Table/SQL API, that's easily done. E.g.,

WITH use_session_windows AS
  (select * from events where events.type = 'a' or events.type = 'b')
SELECT window_start, window_end, count(*) AS cnt
  FROM TABLE(
    SESSION(DATA => TABLE use_session_windows,
            TIMECOL => DESCRIPTOR(rowtime),  
            GAP => INTERVAL '30' MINUTES))
  GROUP BY window_start, window_end;

Upvotes: 0

Rion Williams
Rion Williams

Reputation: 76597

If so, you could take advantage of Flink's support for Side Outputs and use that as a means to map each of the distinct types to its own stream and operate on those separately (or union them downstream, etc.)

Basically:

  • Read your data from a Kafka Topic (via KafkaSource)
  • Map/Process your data via side-outputs to get each specific type
  • Construct your job graph to align with what you need (union and window downstream, etc.)

This might look something like:

val events = streamEnv
  .fromSource(KafkaSource.build(...))
  .process(YourTypeSeparatorOperator())

// Example: Getting A & B events
val a = events.getSideOutput(Tags.a)
val b = events.getSideOutput(Tags.b)

// Union this stream (and act on it via windowing, etc.)
val ab = a.union(b)

// Likewise perform operations necessary for C & D types here

// Eventually merge all of these separate streams together if needed

In the above YourTypeSeparatorOperator() would actually use the side-outputs and based on the type of your event, output it to a designated side-output:

// Example OutputTag
object Tags{
    val a = OutputTag("a", TypeInformation.of(YourClass::class.java))
    val b = OutputTag("b", TypeInformation.of(YourClass::class.java))
    val c = OutputTag("c", TypeInformation.of(YourClass::class.java))
    val d = OutputTag("d", TypeInformation.of(YourClass::class.java))     
}

// Usage
override fun processElement(...) {
   ...
   when (message.type) {
      "a" -> context.output(Tags.a, message)
      "b" -> context.output(Tags.b, message)
      "c" -> context.output(Tags.c, message)
      "d" -> context.output(Tags.d, message)
   }
}

Upvotes: 2

Related Questions