M.K
M.K

Reputation: 403

Storm Trident: How to use IPartitionedTridentSpout?

In our system, we have multiple data generators creating file content in a shared file system, with an indication of the DataSourceId in the filename. The need is to have a fair scheduling mechanism to read files generated by all sources, parse, flatten & enrich (using Reference Data) the data records in the files, batch the enriched records and write into database.

I use the IPartitionedTridentSpout. The topology looks like this:

TransactionalTridentEsrSpout spout
    = new TransactionalTridentEsrSpout(NUM_OF_PARTITIONS);

TridentTopology topology = new TridentTopology();
topology.newStream("FileHandlerSpout", spout)
         .each(new Fields("filename", "esr"), new Utils.PrintFilter())
         .parallelismHint(NUM_OF_PARTITIONS)
         .shuffle()
         .each(new Fields("filename", "record"), new RecordFlattenerAndEnricher(), new elds("record-enriched"))
         .each(new Fields("filename", "record-enriched"), new Utils.PrintFilter())
         .project(new Fields(record-enriched")) // pass only required 
         .parallelismHint(PARALLELISM_HINT_FOR_ESR_FLATTENER_ENRICHER)
         .shuffle()
         .aggregate(new Fields("record-enriched"), new BlockWriterToDb(), new Fields("something"))
         .each(new Fields("something"), new Utils.PrintFilter())
         .parallelismHint(PARALLELISM_HINT_FOR_GP_WRITER);

Since the data files are very big (usually 1 million records), I read small batches of 10K records. For every transactionId generated by Coordinator, my Emitter emits the next 10K records of the current/next file in its partition. The final BlockWriter would aggregate enriched records into a buffer and on “complete” method call, will write the buffer to DB.

The topology works fine, but I have the following question:

The parallelismHint for the ParttionedTridentSpout, which impacts the number of Emitters, is set to the number of partitions. The parallelismHint for the next two layers (the FlattenerAndEnricher and BlockWriterToDb) needs to be set to higher values because we have lot of work to be done there. Since there is no groupBy need here, I use shuffle() between all stages here. When a particular downstream bolt dies, Trident is expected to call the Emitter with appropriate old metadata asking it to re-emit. But since a shuffle has happened, the specific records that were part of one Emitter’s emit would have landed in multiple downstream bolts. So, how can Trident call the appropriate emitters for re-emit so the exact same records are re-emitted. Even if Trident calls the appropriate emitters, the Emitter is going to re-emit the whole 10K batch, out of which some records have only failed. How is this whole sequence handled by Storm and how do we design the application logic here to handle fault tolerance for exactly once semantics.

Upvotes: 1

Views: 483

Answers (1)

teu
teu

Reputation: 979

When using Trident, the entire batch succeeds, or the entire batch fails. When a batch fails, the spout should (automatically) replay the entire batch, and you won't be able to pick and choose among its records at emit time.

To get exactly-once semantics, your downstream logic / DB update should either ignore replayed items (keeping track of the batch ID for successfully updated items), or be idempotent.

Upvotes: 0

Related Questions