Reputation: 76
I have a dropwizard application using Flink to read from Kafka but the application blows up with this exception when I start it:
java -jar my-app.jar server my-config.yaml
[2018-01-04T01:04:24,577Z](main)([]) INFO - FlinkMiniCluster - Stopping
FlinkMiniCluster.
[2018-01-04T01:04:24,591Z](main)([]) WARN - ROOT - unavailable
! com.typesafe.config.ConfigException$UnresolvedSubstitution:
reference.conf @ jar:file:/my-app.jar!/reference.conf: 804: Could not
resolve substitution to a value: ${akka.stream.materializer}
at com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:231)
at com.typesafe.config.impl.SimpleConfig.resolveWith(SimpleConfig.java:74)
at com.typesafe.config.impl.SimpleConfig.resolve(SimpleConfig.java:64)
at com.typesafe.config.impl.SimpleConfig.resolve(SimpleConfig.java:59)
at com.typesafe.config.impl.SimpleConfig.resolve(SimpleConfig.java:37)
at com.typesafe.config.impl.ConfigImpl$1.call(ConfigImpl.java:374)
at com.typesafe.config.impl.ConfigImpl$1.call(ConfigImpl.java:367)
at com.typesafe.config.impl.ConfigImpl$LoaderCache.getOrElseUpdate(ConfigImpl.java:65)
at com.typesafe.config.impl.ConfigImpl.computeCachedConfig(ConfigImpl.java:92)
at com.typesafe.config.impl.ConfigImpl.defaultReference(ConfigImpl.java:367)
at com.typesafe.config.ConfigFactory.defaultReference(ConfigFactory.java:413)
at akka.actor.ActorSystem$Settings.<init>(ActorSystem.scala:307)
at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:683)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:245)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:288)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:263)
at akka.actor.ActorSystem$.create(ActorSystem.scala:191)
at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem(AkkaUtils.scala:106)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.startJobManagerActorSystem(FlinkMiniCluster.scala:300)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.singleActorSystem$lzycompute$1(FlinkMiniCluster.scala:329)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.org$apache$flink$runtime$minicluster$FlinkMiniCluster$$singleActorSystem$1(FlinkMiniCluster.scala:329)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:343)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$1.apply(FlinkMiniCluster.scala:341)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:341)
at org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:323)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:107) ...
My Flink stream is pretty basic:
environment
.addSource(new FlinkKafkaConsumer010<>(...)
.name("source name 1")
.union(environment.addSource(new FlinkKafkaConsumer010<>(...)
.name("source name 2"))
.map(new MyMapFunction())
.addSink(new PrintSinkFunction<>())
.name("Sink: Print");
Strangely enough, the application runs just fine and successfully creates a FlinkMiniCluster when debugging in IDEA.
I'm using flink 1.4 and did not start a flink job manager when running from IDEA or command line.
Is there a configuration I need to be setting up to run from the command line?
Upvotes: 1
Views: 3616
Reputation: 654
It might be caused by forgetting about using build-jar
build profile in
mvn clean install -Pbuild-jar
as documented in the flink documentation
Upvotes: 0
Reputation: 11
As CPS said, the refernce.conf file in shaded JAR has to be a merged file of separated akka modules' reference.conf. To get it, the following shade transformer has to be added to the shade configuration :
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
...
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
I experimented a small constraint. I also use the ManifestResourceTransformer to set the mainClass and I had to order the two transformers with ManifestResourceTransformer first and AppendingTransformer in second position (otherwise ManifestResourceTransformer modifies the entry method of the AppendingTransformer).
Upvotes: 1
Reputation: 537
This answer to a similar question solves the issue. This has nothing to do with Flink, but has to do with how akka configuration is handled. From their documentation:
Akka’s configuration approach relies heavily on the notion of every module/jar having its own reference.conf file, all of these will be discovered by the configuration and loaded. Unfortunately this also means that if you put/merge multiple jars into the same jar, you need to merge all the reference.confs as well. Otherwise all defaults will be lost and Akka will not function.
Using the shade plugin to make the jar solves the problem.
Upvotes: 0
Reputation: 76
FYI - I determined that the akka dependencies from Flink were not being recognized at runtime so I manually added them to my application's pom:
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-protobuf_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
Upvotes: 1