Reputation: 369
I'm currently trying to implement a Storm topology that integrates with the R language.
As a starting point, i took the following project (https://github.com/allenday/R-Storm) which works by extending the ShellBolt class to implement R integration, as well as an R library to handle communication between the java and R sides.
My problem is that if i create a topology based on regular (java-only) bolts, i can chain them together without issue. However, when one of the bolts in the middle of the chain is an R Shell Bolt, the thing just falls apart with:
5661 [Thread-18] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: Pipe to subprocess seems to be broken! No output read.
Shell Process Exception:
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-0.9.0-wip16.jar:na]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58) ~[storm-0.9.0-wip16.jar:na]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-0.9.0-wip16.jar:na]
at backtype.storm.daemon.executor$fn__3557$fn__3569$fn__3616.invoke(executor.clj:715) ~[storm-0.9.0-wip16.jar:na]
at backtype.storm.util$async_loop$fn__436.invoke(util.clj:377) ~[storm-0.9.0-wip16.jar:na]
at clojure.lang.AFn.run(AFn.java:24) ~[clojure-1.4.0.jar:na]
at java.lang.Thread.run(Unknown Source) ~[na:1.7.0_25]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: Pipe to subprocess seems to be broken! No output read.
More concrete, the following topology works as expected:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("permutebolt", new PermuteBolt(), 1).shuffleGrouping("spout");
Where PermuteBolt is an R Shell Bolt. The logs for this example show the expected output:
6246 [Thread-18] INFO backtype.storm.daemon.task - Emitting: spout default [four score and seven years ago]
6246 [Thread-16] INFO backtype.storm.daemon.executor - Processing received message source: spout:3, stream: default, id: {}, [four score and seven years ago]
6261 [Thread-23] INFO backtype.storm.daemon.task - Emitting: permutebolt default ["PERMUTE seven years ago and four score"]
If, however i add another bolt that gets its data from the first one, such as:
builder.setBolt("permutebolt", new PermuteBolt(), 1).shuffleGrouping("spout");
builder.setBolt("identity", new IdentityBolt(new Fields("identity")), 1).fieldsGrouping("permutebolt", new Fields("permutation"));
It fails with the trace printed above. Also, what's weird is that this second example which is failing is included with the project.
Is this an issue anyone has faced before ?
UPDATE: I noticed this only occurs when using R Shell Bolts, i have since tried launching bolts that use python scripts and have been able to chain them normally.
Upvotes: 2
Views: 1466
Reputation: 78
@andrei, this is fixed in 1.01 uploaded to github today: https://github.com/allenday/R-Storm/releases/tag/v1.01
It has been submitted to CRAN and will be available soon.
Thanks for reporting.
-Allen
Upvotes: 1