Reputation: 261
I want to get data from one database through a spout and process the data and store it in another database using trident.I am new to storm and trident and i am not sure how to implement it.I got the data from the database in a spout(separate java class which implements IRichSpout which is supported by trident) and i emit it as an object.I need to pass it to the trident topology for processing(counting the number of records) and storing it to a database.
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1",spout)
now the new stream takes a spout as an input i.e the syntax is
Stream storm.trident.TridentTopology.newStream(String txId, IRichSpout spout)
but i want to give the object emitted by the spout as an input to the stream for the trident to process and save to database.So how can i bring my spout class inside trident and pass it to new stream or should i combine both spout and trident as a same class??
can someone help plz.....
Upvotes: 2
Views: 1385
Reputation: 8171
You can do something like
MyFooSpout spout = new MyFooSpout();
topology.newStream("spout1", spout)....
Where the MyFooSpout
class should implements the IRichSpout
From the trident tutorial
The newStream
method in TridentTopology
creates a stream of data in the topology reading from any input source.
In your case it could be the MyFooSpout
class
.I got the data from the database in a spout(separate java class which implements IRichSpout which is supported by trident) and i emit it as an object
can you please clarify what exactly are you referring to? How is your spout code looks like? As a very generic example if we write something like (taken from the tutorial page)
TridentState wordCounts = topology.newStream("spout1", spout).each(new Fields("sentence"), new Split(), new Fields("word"))
it means that the spout
supposed to emit a single field namely sentence
. By calling each
the Split
function will be applied to each tuple in the stream, which will perform based on whatever code is written by taking the sentence
field. However this could vary depending on your requirement. e.g it could be a Filter
as MyFilter extends BaseFilter
Or a function
as MyCustomFuction extends BaseFunction
. Check out the API page for more details.
Upvotes: 1