mini.1601
mini.1601

Reputation: 167

Write to Mongodb from Apache storm

I am trying to write a custom stream grouping which writes to Mongodb. I am running a local cluster for now. I have a custom stream class and a mongo object. I write to mongodb in both the prepare() and chooseTask(). It writes to mongodb but the supervisors cannot start. I see this error in the supervisor log:

b.s.d.worker [ERROR] Error on initialization of server mk-worker
java.lang.NoClassDefFoundError: com/mongodb/MongoClient

at storm.starter.MongoMonitorObject.<init>(MongoMonitorObject.java:23) ~[stormjar.jar:0.10.0]


at storm.starter.ModStreamGrouping.prepare(ModStreamGrouping.java:94)
 ~[stormjar.jar:0.10.0]

I am making changes in the storm starter project for now.

public class ModStreamGrouping implements CustomStreamGrouping, Serializable{

  java.util.List<java.lang.Integer> targetTasks = new ArrayList();

  @Override
  public List<Integer> chooseTasks(int taskId,List<Object> values) {
      System.out.println("taskiD = " + taskId);
      System.out.println("values = " + values);
      return numTasks[0];
  }

  @Override
  public void prepare(WorkerTopologyContext context, GlobalStreamId stream, java.util.List<java.lang.Integer> targetTasks) {

      MongoMonitorObject mmo = new 
      System.out.println(" in prep() ");
      System.out.println("targetTasks = " + targetTasks);
      numTasks = targetTasks.size();

  }
}

public class MongoMonitorObject {
  private static final Logger LOG = LoggerFactory.getLogger(MongoMonitorObject.class);

  public MongoMonitorObject(java.util.List<java.lang.Integer> targetTasks){
  try{
    MongoClient mongoClient = new MongoClient("localhost", 27017);
    DB db = mongoClient.getDB( "loadDB" );
    DBCollection collection = db.getCollection("testCollection");
    for (Integer task : targetTasks) {
       BasicDBObject document = new BasicDBObject();
       document.put("tid", task);
       collection.insert(document);
    }
  }

  catch (UnknownHostException e) {
    System.out.println(" in UnknownHostException ");
    LOG.info(" in UnknownHostException ");
  }
  catch (Exception e) {
    System.out.println(" in Exception ");
    LOG.info(" in Exception ");
  }
  }
}

The stream grouping is defined in a ModStreamGrouping.java and mongo connection is defined in MongoMonitorObject.java. Both belong to the package storm.starter.

I can upload the topology but the supervisors cannot spawn workers. There's a small link I'm missing somewhere but I don't know where exactly. I added the following in storm starter's pom.xml to include mongodb connectivity:

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongo-java-driver</artifactId>
    <version>2.13.3</version>
</dependency>
<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>bson</artifactId>
    <version>2.13.3</version>
</dependency>

Upvotes: 1

Views: 611

Answers (1)

Haifeng Zhang
Haifeng Zhang

Reputation: 31895

Edit: I read it here: https://github.com/mongodb/mongo-java-driver

mongodb-java-driver is a all-in-one jar, it contains bson and core

Therefore dependency mongodb-java-driver is enough.

If use dependency mongodb-driver , dependencies bson and core are needed.

Original post:

Try add mongodb-driver-core and use the newer version dependencies

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-core</artifactId>
    <version>3.2.2</version>
</dependency>

The MongoDB Java Driver uber-artifact, containing mongodb-driver, mongodb-driver-core, and bson

Check it here

Upvotes: 1

Related Questions