Reputation: 393
public class WordCountTopology {
private static final String FILTER_BOLT_ID = "filter-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
static final PropertiesConfiguration CONFIGURATION = new PropertiesConfiguration();
static {
try {
CONFIGURATION.load(ClassLoader.getSystemResourceAsStream(
"config/topo.properties"));
} catch (ConfigurationException e) {
throw new RuntimeException("Failed to load topo properties");
}
}
private static KafkaSpout createKafkaSpout() {
List<String> hostList = new ArrayList<String>();
String kafkaPortStr = CONFIGURATION.getString("KAFKA_PORT");
String[] kafkaHosts = CONFIGURATION.getString("KAFKA_HOSTS").split(",");
for (String kafkaHost : kafkaHosts) {
hostList.add(kafkaHost + ":" + kafkaPortStr);
}
BrokerHosts hosts = StaticHosts.fromHostString(
hostList, CONFIGURATION.getInt("NUM_KAFKA_PARTITIONS"));
SpoutConfig spoutConfig = new SpoutConfig(
hosts,
CONFIGURATION.getString("KAFKA_TOPIC"),
CONFIGURATION.getString("ZOOKEEPER_STORAGE_PATH"),
CONFIGURATION.getString("ZOOKEEPER_STORAGE_ID"));
spoutConfig.forceStartOffsetTime(-1);
return new KafkaSpout(spoutConfig);
}
topo.properties (not full file):
Topology1
KAFKA_TOPIC=varnish
ZOOKEEPER_STORAGE_PATH=/kafkastorm
ZOOKEEPER_STORAGE_ID=discovery
Topology2
KAFKA_TOPIC=varnish
ZOOKEEPER_STORAGE_PATH=/kafkastorm8
ZOOKEEPER_STORAGE_ID=discovery8
I am trying to run two tpoplogies consuming data from same kafka topic, topology 1 works fine with similar code as above but topology2 gives err.
I am getting the Following Error for kafka spoutkafka.common.OffsetOutOfRangeException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl
Upvotes: 0
Views: 463
Reputation: 393
The solution was that I had to go to the zookeeper and using zookeeper cli I deleted the zk storage root path /kafkastorm8 for the topology2 and resubmitted the topology and it worked fine.
Upvotes: 0
Reputation: 8161
could you please try to change and see if it works
spoutConfig.forceStartOffsetTime(-1);
to
spoutConfig.forceStartOffsetTime(-2);
This will force the spout to rewind to a previous offset, for more info you can read this page
Upvotes: 2