Reputation: 1
When a Storm job is submitted to Hadoop cluster to write to hdfs using hdfsbolt, no topology is created in the Storm UI. The error is shown because of some package used in the code(org.apache.storm.hdfs.bolt.AbstractHdfsBolt.cleanup(AbstractHdfsBolt.java:261) ~[f083f1dc515311e9868bcf07babd3298.jar:?]).
Error:
42608 [Thread-20-bolt-executor[3 3]] INFO o.a.s.util - Async loop interrupted!
42608 [Thread-19-disruptor-executor[3 3]-send-queue] INFO o.a.s.util - Async loop interrupted!
42608 [SLOT_1024] INFO o.a.s.d.executor - Shut down executor bolt:[3 3]
42608 [SLOT_1024] INFO o.a.s.d.executor - Shutting down executor __acker:[1 1]
42608 [Thread-22-__acker-executor[1 1]] INFO o.a.s.util - Async loop interrupted!
42608 [Thread-21-disruptor-executor[1 1]-send-queue] INFO o.a.s.util - Async loop interrupted!
42608 [SLOT_1024] INFO o.a.s.d.executor - Shut down executor __acker:[1 1]
42608 [SLOT_1024] INFO o.a.s.d.executor - Shutting down executor __system:[-1 -1]
42608 [Thread-24-__system-executor[-1 -1]] INFO o.a.s.util - Async loop interrupted!
42608 [Thread-23-disruptor-executor[-1 -1]-send-queue] INFO o.a.s.util - Async loop interrupted!
42609 [SLOT_1024] INFO o.a.s.d.executor - Shut down executor __system:[-1 -1]
42609 [SLOT_1024] INFO o.a.s.d.executor - Shutting down executor kafka_spout:[5 5]
42609 [Thread-25-disruptor-executor[5 5]-send-queue] INFO o.a.s.util - Async loop interrupted!
42609 [Thread-26-kafka_spout-executor[5 5]] INFO o.a.s.util - Async loop interrupted!
42611 [SLOT_1024] INFO o.a.s.d.executor - Shut down executor kafka_spout:[5 5]
42611 [SLOT_1024] INFO o.a.s.d.executor - Shutting down executor forwardToKafka:[4 4]
42611 [Thread-28-forwardToKafka-executor[4 4]] INFO o.a.s.util - Async loop interrupted!
42611 [Thread-27-disruptor-executor[4 4]-send-queue] INFO o.a.s.util - Async loop interrupted!
42612 [SLOT_1024] ERROR o.a.s.d.s.Slot - Error when processing event
java.lang.NullPointerException: null
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.cleanup(AbstractHdfsBolt.java:261) ~[f083f1dc515311e9868bcf07babd3298.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.executor$fn__9739.invoke(executor.clj:878) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.executor$mk_executor$reify__9530.shutdown(executor.clj:437) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__10165$exec_fn__1369__auto__$reify__10167$shutdown_STAR___10187.invoke(worker.clj:684) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.worker$fn__10165$exec_fn__1369__auto__$reify$reify__10213.shutdown(worker.clj:724) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:67) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:69) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.killContainerForChangedAssignment(Slot.java:311) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:527) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:265) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:752) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
42612 [SLOT_1024] ERROR o.a.s.u.Utils - Halting process: Error when processing an event
java.lang.RuntimeException: Halting process: Error when processing an event
at org.apache.storm.utils.Utils.exitProcess(Utils.java:1814) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:796) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
The following is the Java code used. This is the main topology file. Data is collected from Kafka and sent to hdfs through hdfsbolt. Part of the data is getting stored in hdfs, but all the worker nodes are not working and also the topology is not getting created in Storm UI.
Java:
package hdpstrm.hdpstrm;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Properties;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout. * ;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Values;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import hdpstrm.hdpstrm.printBolt;
public class MyMain {
private static HdfsBolt createHdfsBolt() {
// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat();
fileNameFormat.withPath("/user/march26");
fileNameFormat.withPrefix("upcse"); //Files end with the following suffix
fileNameFormat.withExtension(".csv");
return new HdfsBolt().withFsUrl("hdfs://localhost:8020").withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
}
public static void main(String[] args) throws Exception {
Config config = new Config();
config.setDebug(false);
config.setNumWorkers(1);
config.put("ALLFILE", (Object)"/home/kdx/out2.txt");
TopologyBuilder tp = new TopologyBuilder();
String kafka_bootstrap = "localhost:6667";
String kafka_topic = args[0];
Builder < String,
String > kafka_config = KafkaSpoutConfig.builder(kafka_bootstrap, kafka_topic).setGroupId("group_id");
kafka_config.build().getKafkaProps().keySet();
KafkaSpout < String,
String > kafka_spout = new KafkaSpout < String,
String > (kafka_config.build());
tp.setSpout("kafka_spout", kafka_spout, 1);
tp.setBolt("bolt", new printBolt()).shuffleGrouping("kafka_spout");
tp.setBolt("forwardToKafka", createHdfsBolt(), 1).shuffleGrouping("bolt");
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkaTopology", config, tp.createTopology());
//Wait for 40 seconds
Thread.sleep(40000);
//Stop the topology
cluster.shutdown();
System.out.println(" ******** TERMINATED THE LOCAL CLUSTER *********");
StormSubmitter.submitTopologyWithProgressBar("MyMain", config, tp.createTopology());
}
}
The expected result is the creation of topology in Storm UI and also ensure the participation of all the worker nodes when running the Storm jar.
Upvotes: 0
Views: 379
Reputation: 3651
The error is due to a bug in storm-hdfs.
The line you're getting an error in is https://github.com/apache/storm/blob/v1.2.1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java#L261
That variable is only initialized if the rotation policy is a TimedRotationPolicy, which yours isn't.
You can file a bug report at https://issues.apache.org/jira. PRs are also welcome at https://github.com/apache/storm.
Upvotes: 2