Reputation: 167
I am trying to write a custom stream grouping which writes to Mongodb. I am running a local cluster for now. I have a custom stream class and a mongo object. I write to mongodb in both the prepare() and chooseTask(). It writes to mongodb but the supervisors cannot start. I see this error in the supervisor log:
b.s.d.worker [ERROR] Error on initialization of server mk-worker
java.lang.NoClassDefFoundError: com/mongodb/MongoClient
at storm.starter.MongoMonitorObject.<init>(MongoMonitorObject.java:23) ~[stormjar.jar:0.10.0]
at storm.starter.ModStreamGrouping.prepare(ModStreamGrouping.java:94)
~[stormjar.jar:0.10.0]
I am making changes in the storm starter project for now.
public class ModStreamGrouping implements CustomStreamGrouping, Serializable{
java.util.List<java.lang.Integer> targetTasks = new ArrayList();
@Override
public List<Integer> chooseTasks(int taskId,List<Object> values) {
System.out.println("taskiD = " + taskId);
System.out.println("values = " + values);
return numTasks[0];
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, java.util.List<java.lang.Integer> targetTasks) {
MongoMonitorObject mmo = new
System.out.println(" in prep() ");
System.out.println("targetTasks = " + targetTasks);
numTasks = targetTasks.size();
}
}
public class MongoMonitorObject {
private static final Logger LOG = LoggerFactory.getLogger(MongoMonitorObject.class);
public MongoMonitorObject(java.util.List<java.lang.Integer> targetTasks){
try{
MongoClient mongoClient = new MongoClient("localhost", 27017);
DB db = mongoClient.getDB( "loadDB" );
DBCollection collection = db.getCollection("testCollection");
for (Integer task : targetTasks) {
BasicDBObject document = new BasicDBObject();
document.put("tid", task);
collection.insert(document);
}
}
catch (UnknownHostException e) {
System.out.println(" in UnknownHostException ");
LOG.info(" in UnknownHostException ");
}
catch (Exception e) {
System.out.println(" in Exception ");
LOG.info(" in Exception ");
}
}
}
The stream grouping is defined in a ModStreamGrouping.java and mongo connection is defined in MongoMonitorObject.java. Both belong to the package storm.starter.
I can upload the topology but the supervisors cannot spawn workers. There's a small link I'm missing somewhere but I don't know where exactly. I added the following in storm starter's pom.xml to include mongodb connectivity:
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>2.13.3</version>
</dependency>
Upvotes: 1
Views: 611
Reputation: 31895
Edit: I read it here: https://github.com/mongodb/mongo-java-driver
mongodb-java-driver is a all-in-one jar, it contains bson and core
Therefore dependency mongodb-java-driver
is enough.
If use dependency mongodb-driver
, dependencies bson
and core
are needed.
Original post:
Try add mongodb-driver-core
and use the newer version dependencies
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-core</artifactId>
<version>3.2.2</version>
</dependency>
The MongoDB Java Driver uber-artifact, containing mongodb-driver, mongodb-driver-core, and bson
Check it here
Upvotes: 1