Shigure
Shigure

Reputation: 157

Which way of using flink's broadcast state is better

Use flink version 1.13.1

The code has been streamline.

I use broadcast state in my project, and it's will send some config every 5 min, because one process function only connect one broadcast source, so I definition a case class in order to transfer three kinds config

The case class:

case class OrderConfBroadcastBean(orderRuleConfig: List[OrderInfoBean],
                                  userSegmentInfo: Map[String, (String, String)],
                                  lacciRegRel: Map[String, Set[String]])

and the broadcast state code:

    val orderConfBroadcast = env.addSource(new OrderConfSource(dbConfig, serverConfig.smsRuleRedis))
      .name("order_conf_load")
      .uid("order_conf_load")
      .setParallelism(1)
      .broadcast(new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean]))

I wonder that the two way to use broadcast state in the process function, which one is right or which one has better performance and lower memory usage, and why

First usage:

class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
                         var orderInfo: List[OrderInfoBean],
                         redisConf: String,
                         var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {

  override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
    userSegmentInfo.get("xxx")
    orderInfo.map(xxx)
  }

  override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
    if (value.orderRuleConfig.nonEmpty) {
      orderInfo = value.orderRuleConfig
    }
    if (value.userSegmentInfo.nonEmpty) {
      userSegmentInfo = value.userSegmentInfo
    }
    if (value.lacciRegRel.nonEmpty) {
      lacciRegRel = value.lacciRegRel
    }
  }
}

Second way:

class OrderFilterProcess(var userSegmentInfo: Map[String, (String, String)],
                         var orderInfo: List[OrderInfoBean],
                         redisConf: String,
                         var lacciRegRel: Map[String, Set[String]]) extends KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean] {

  val stateDescriptor = new MapStateDescriptor[String, OrderConfBroadcastBean]("order_conf_broadcast", createTypeInformation[String], createTypeInformation[OrderConfBroadcastBean])

  override def processElement(regLacci: RegLacciBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#ReadOnlyContext, out: Collector[OrderResultBean]): Unit = {
    val state = ctx.getBroadcastState(ruleStateDescriptor)
    Option(state.get("order_state")).map(_.get("xxx")).orElse(userSegmentInfo.get("xxx"))
  }

  override def processBroadcastElement(value: OrderConfBroadcastBean, ctx: KeyedBroadcastProcessFunction[String, RegLacciBean, OrderConfBroadcastBean, OrderResultBean]#Context, out: Collector[OrderResultBean]): Unit = {
    ctx.getBroadcastState(stateDescriptor).put("order_state", value);
  }
}

Upvotes: 0

Views: 519

Answers (1)

David Anderson
David Anderson

Reputation: 43409

The biggest difference between those two implementations is that in the first you are storing the data received from the broadcast stream into variables that will be lost when the job fails, whereas in the second you are using broadcast state, which will be checkpointed and recovered.

There is some overhead for version two. You'd have to measure it to find out how much -- but in both cases the data will be in memory, so the difference shouldn't be huge.

Upvotes: 1

Related Questions