ayvak
ayvak

Reputation: 1

Storm topology not getting created in Storm UI

When a Storm job is submitted to Hadoop cluster to write to hdfs using hdfsbolt, no topology is created in the Storm UI. The error is shown because of some package used in the code(org.apache.storm.hdfs.bolt.AbstractHdfsBolt.cleanup(AbstractHdfsBolt.java:261) ~[f083f1dc515311e9868bcf07babd3298.jar:?]).

Error:

42608 [Thread-20-bolt-executor[3 3]] INFO  o.a.s.util - Async loop interrupted!
42608 [Thread-19-disruptor-executor[3 3]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42608 [SLOT_1024] INFO  o.a.s.d.executor - Shut down executor bolt:[3 3]
42608 [SLOT_1024] INFO  o.a.s.d.executor - Shutting down executor __acker:[1 1]
42608 [Thread-22-__acker-executor[1 1]] INFO  o.a.s.util - Async loop interrupted!
42608 [Thread-21-disruptor-executor[1 1]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42608 [SLOT_1024] INFO  o.a.s.d.executor - Shut down executor __acker:[1 1]
42608 [SLOT_1024] INFO  o.a.s.d.executor - Shutting down executor __system:[-1 -1]
42608 [Thread-24-__system-executor[-1 -1]] INFO  o.a.s.util - Async loop interrupted!
42608 [Thread-23-disruptor-executor[-1 -1]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42609 [SLOT_1024] INFO  o.a.s.d.executor - Shut down executor __system:[-1 -1]
42609 [SLOT_1024] INFO  o.a.s.d.executor - Shutting down executor kafka_spout:[5 5]
42609 [Thread-25-disruptor-executor[5 5]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42609 [Thread-26-kafka_spout-executor[5 5]] INFO  o.a.s.util - Async loop interrupted!
42611 [SLOT_1024] INFO  o.a.s.d.executor - Shut down executor kafka_spout:[5 5]
42611 [SLOT_1024] INFO  o.a.s.d.executor - Shutting down executor forwardToKafka:[4 4]
42611 [Thread-28-forwardToKafka-executor[4 4]] INFO  o.a.s.util - Async loop interrupted!
42611 [Thread-27-disruptor-executor[4 4]-send-queue] INFO  o.a.s.util - Async loop interrupted!
42612 [SLOT_1024] ERROR o.a.s.d.s.Slot - Error when processing event
java.lang.NullPointerException: null
    at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.cleanup(AbstractHdfsBolt.java:261) ~[f083f1dc515311e9868bcf07babd3298.jar:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.executor$fn__9739.invoke(executor.clj:878) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at clojure.lang.MultiFn.invoke(MultiFn.java:233) ~[clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.executor$mk_executor$reify__9530.shutdown(executor.clj:437) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
    at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.7.0.jar:?]
    at clojure.lang.Reflector.invokeNoArgInstanceMember(Reflector.java:313) ~[clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn__10165$exec_fn__1369__auto__$reify__10167$shutdown_STAR___10187.invoke(worker.clj:684) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.worker$fn__10165$exec_fn__1369__auto__$reify$reify__10213.shutdown(worker.clj:724) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.ProcessSimulator.killProcess(ProcessSimulator.java:67) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.LocalContainer.kill(LocalContainer.java:69) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.killContainerForChangedAssignment(Slot.java:311) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.handleRunning(Slot.java:527) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.stateMachineStep(Slot.java:265) ~[storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:752) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
42612 [SLOT_1024] ERROR o.a.s.u.Utils - Halting process: Error when processing an event
java.lang.RuntimeException: Halting process: Error when processing an event
    at org.apache.storm.utils.Utils.exitProcess(Utils.java:1814) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]
    at org.apache.storm.daemon.supervisor.Slot.run(Slot.java:796) [storm-core-1.2.1.3.1.0.0-78.jar:1.2.1.3.1.0.0-78]

The following is the Java code used. This is the main topology file. Data is collected from Kafka and sent to hdfs through hdfsbolt. Part of the data is getting stored in hdfs, but all the worker nodes are not working and also the topology is not getting created in Storm UI.

Java:

package hdpstrm.hdpstrm;

import java.io.File;
import java.io.FileOutputStream;
import java.util.Properties;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;

import org.apache.storm.kafka.spout. * ;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Values;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;

import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import hdpstrm.hdpstrm.printBolt;

public class MyMain {

    private static HdfsBolt createHdfsBolt() {
        // use "|" instead of "," for field delimiter
        RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");

        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);

        // rotate files when they reach 5MB
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);

        DefaultFileNameFormat fileNameFormat = new DefaultFileNameFormat();
        fileNameFormat.withPath("/user/march26");
        fileNameFormat.withPrefix("upcse"); //Files end with the following suffix
        fileNameFormat.withExtension(".csv");

        return new HdfsBolt().withFsUrl("hdfs://localhost:8020").withFileNameFormat(fileNameFormat).withRecordFormat(format).withRotationPolicy(rotationPolicy).withSyncPolicy(syncPolicy);
    }

    public static void main(String[] args) throws Exception {

        Config config = new Config();
        config.setDebug(false);
        config.setNumWorkers(1);
        config.put("ALLFILE", (Object)"/home/kdx/out2.txt");
        TopologyBuilder tp = new TopologyBuilder();
        String kafka_bootstrap = "localhost:6667";
        String kafka_topic = args[0];
        Builder < String,
        String > kafka_config = KafkaSpoutConfig.builder(kafka_bootstrap, kafka_topic).setGroupId("group_id");
        kafka_config.build().getKafkaProps().keySet();
        KafkaSpout < String,
        String > kafka_spout = new KafkaSpout < String,
        String > (kafka_config.build());

        tp.setSpout("kafka_spout", kafka_spout, 1);
        tp.setBolt("bolt", new printBolt()).shuffleGrouping("kafka_spout");
        tp.setBolt("forwardToKafka", createHdfsBolt(), 1).shuffleGrouping("bolt");

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("kafkaTopology", config, tp.createTopology());

        //Wait for 40 seconds
        Thread.sleep(40000);

        //Stop the topology
        cluster.shutdown();
        System.out.println(" ******** TERMINATED THE LOCAL CLUSTER *********");

        StormSubmitter.submitTopologyWithProgressBar("MyMain", config, tp.createTopology());
    }
}

The expected result is the creation of topology in Storm UI and also ensure the participation of all the worker nodes when running the Storm jar.

Upvotes: 0

Views: 379

Answers (1)

Stig Rohde D&#248;ssing
Stig Rohde D&#248;ssing

Reputation: 3651

The error is due to a bug in storm-hdfs.

The line you're getting an error in is https://github.com/apache/storm/blob/v1.2.1/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java#L261

That variable is only initialized if the rotation policy is a TimedRotationPolicy, which yours isn't.

You can file a bug report at https://issues.apache.org/jira. PRs are also welcome at https://github.com/apache/storm.

Upvotes: 2

Related Questions