arseny
arseny

Reputation: 386

can i traverse the items in a KTable from an external method

i have a kafka topic and a KTable that listens to it.

i want to write a http POST request that will traverse the current items in the ktable, perform some action on them and write back to the topic

so basically i have:

private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.cleanUp()
    stream.start()

....

override def refreshTokens = {

    accessTokenTable.mapValues {
        new ValueMapper[String, String] {
            override def apply(value: String) = {
                value
            }
        }
    }.print(token_topic_name)
}

and when i try to call this method nothing is printed/written to the topic

what am i missing? is my only choice is to write the messages from the ktable to hashmap and read it from there? it misses the whole point of ktables?

Upvotes: 5

Views: 505

Answers (2)

miguno
miguno

Reputation: 15077

the correct solution is to use GlobalKTable to avoid "the state store may have migrated to another instance" errors as discussed here.

Since you answered your own question and apparently run into another issue in your follow-up, let me expand on what you said in your answer to help other readers of this question thread.

  • If you are using a KTable (which is partitioned = each "instance" of a KTable sees only a part of the total table data) Typically, what you need to do is guard against this exception and retry. Think: try-catch-retry.
  • If you are using a GlobalKTable, then you are side-stepping this problem because each instance of a GlobalKTable has a full copy of the entire table data.

Note: Normally, you don't make a decision between KTable vs. GlobalKTable because you want to prevent "state store may have migrated" situations, but because the two abstractions provide different semantics to your application. For example, there are many good reasons to use a KTable rather than a GlobalKTable -- and if you do, you simply need to be aware of what we just discussed here (which is covered in the docs, too, but apparently not obvious/clear enough considering that you did run into this question).

Hope this helps!

Upvotes: 2

arseny
arseny

Reputation: 386

after long investigation, the solution is to query the store behind it (rocksDB) and not the table.

as documented here: confluent

the correct solution is to use GlobalKTable to avoid "the state store may have migrated to another instance" errors as discussed here.

this code worked for me in kafka 0.10.2.1:

    private val accessTokenTable: GlobalKTable[String, String] = builder.globalTable(token_topic_name, token_store_string)

    private val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.cleanUp()
    stream.start()
    val store: ReadOnlyKeyValueStore[String,String] = stream.store(token_store_string,QueryableStoreTypes.keyValueStore[String,String]())

....

    override def refreshTokensFlow = {

       store.all.asScala.map( tuple => {
       // logic goes here
           System.out.println(tuple.key + ": " + tuple.value)
       }
    }

Upvotes: 0

Related Questions