Pushpendra Jaiswal
Pushpendra Jaiswal

Reputation: 470

FlinkQueryableState: configuration issues on a local cluster

I am running flink from IDE. Storing data in the queryable is working, but somehow when I query it, it throws an exception.

Exception

Failure(akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://[email protected]:6123/), Path(/user/jobmanager)])

My code:

config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,"localhost")
config.setString(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,"6123")

@throws[Throwable]
def recover(failure: Throwable): Future[Array[Byte]] = if (failure.isInstanceOf[AssertionError]) return Futures.failed(failure)
else {
  // At startup some failures are expected
  // due to races. Make sure that they don't
  // fail this test.
  return Patterns.after(retryDelay, TEST_ACTOR_SYSTEM.scheduler, TEST_ACTOR_SYSTEM.dispatcher, new Callable[Future[Array[Byte]]]() {
    @throws[Exception]
    def call: Future[Array[Byte]] = return getKvStateWithRetries(queryName, key, serializedKey)
  })
}
}

  @SuppressWarnings(Array("unchecked"))
  private def getKvStateWithRetries(queryName: String,
                                keyHash: Int,
                                serializedKey: Array[Byte]): Future[Array[Byte]] = {

val kvState = client.getKvState(jobID, queryName, keyHash, serializedKey)
kvState.recoverWith(recover(queryName, keyHash, serializedKey))
  }

def onSuccess = new OnSuccess[Array[Byte]]() {
@throws(classOf[Throwable])
override def onSuccess(result: Array[Byte]): Unit = {
  println("found record ")
  val value = KvStateRequestSerializer.deserializeValue(result, valueSerializer)
  println(value)
 }
}


override def invoke(query: QueryMetaData): Unit = {
println("getting inside querystore"+query.record)
val serializedResult = flinkQuery.getResult(query.record, queryName)
serializedResult.onSuccess(onSuccess) 

I am not spawning a new mini-cluster or cluster.submit like https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java as I want to this in the same cluster in the same environment as main app running with env.execute. Is that step necessary.

From the documentation by default flink runs at localhost:6123 Is there problem with connection? Do I need to submit job in separate cluster?

Upvotes: 3

Views: 1263

Answers (1)

Diego Lins de Freitas
Diego Lins de Freitas

Reputation: 625

After a lot of googling i found a solution.

I am using LocalStreamEnvironment and getting the same error, Until a found this thread RemoteEnv connect failed. The error described is for a different setup(not locally) but the gist example contained in the topic used for testing is creating the LocalFlinkMiniCluster with the parameter "useSingleActorSystem" set to false.

Looking at the implementation of LocalStreamEnvironment the MiniCluster is created with "useSingleActorSystem" set to true.

I simply created a class LocalQueryableStreamEnvironment extending LocalStreamEnvironment where the mini cluster is created with "useSingleActorSystem" set to true, and everything is working from IDE.

Now my code is as follow:

Configuration:

Configuration config = new Configuration();
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 6);
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
config.setInteger(JobManagerOptions.WEB_PORT, JobManagerOptions.WEB_PORT.defaultValue());
config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT,JobManagerOptions.PORT.defaultValue());
**config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);**

NOTE: QueryableState only works with this config LOCAL_NUMBER_TASK_MANAGER set to value more then 1!

Instantiate/execute environment:

LocalQueryableStreamEnvironment env = LocalQueryableStreamEnvironment.createLocalEnvironment(3, config);
...
env.addSource(anySource)
   .keyby(anyAtribute)
   .flatmap(new UpdateMyStateToBeQueriedLaterMapper())
   .addSink(..); //etc
...
env.execute("JobNameHere");

And to create the client:

final Configuration config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, JobManagerOptions.PORT.defaultValue());

HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils
    .createHighAvailabilityServices(
                   config, 
                   Executors.newSingleThreadScheduledExecutor(),
                   HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION
    );
return new QueryableStateClient(config,highAvailabilityServices);

For more info access:

Queryable States in ApacheFlink - Implementation

Queryable State Client with 1.3.0-rc0

My dependencies:

compile group: 'org.apache.flink', name: 'flink-java', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-jdbc', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-cep_2.11', version: '1.3.1'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1'
compile 'org.apache.flink:flink-runtime-web_2.11:1.3.1'

Upvotes: 4

Related Questions