Reputation: 241
Need to perform a series of asynchronous jobs that each require querying Elasticsearch, then kicking off processing of the query result.
Hoping to take advantage of Apache Spark's Standalone Cluster mode to exploit distribution of jobs to several machines, we have used the RDD.foreachPartition code pattern explained at Spark Streaming -- foreachRDD, as well as at Spark Streaming connection pool in each JVM and RestAPI service call from Spark Streaming.
Though it seems like the pattern should extend to Elasticsearch, we have not found any specific examples in blog-land of doing something like:
job_list_RDD.foreachPartition(RDDpartition => {
val PartitionClient = Connection.conn()
RDDpartition.foreach(hostmetric => {
hostmetric._2 match {
case "cpu" => {
generateCPUPlots(PartitionClient, guidid, hostmetric._1, hostmetric._3, lab_index)
}
case "mem" => {
generateMemPlots(PartitionClient, guidid, hostmetric._1, lab_index)
}
case _ => {
logger.debug("Unexpected metric")
}
}
})
PartitionClient.close() // SHOULD THIS LINE BE REMOVED ?
})
when the connection is an Elasticsearch PreBuiltTransportClient:
object Connection {
def conn() = {
val port = 9300
val NodeName = "node4"
val settings = (new MySettings).settings
val client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(NodeName), port))
client
}
}
class MySettings extends Serializable {
val port = 9300
val NodeName = "node4"
val ESClusterName = "sparcplugs"
val nodes = List(NodeName)
val addresses = nodes.map { host => new InetSocketTransportAddress(InetAddress.getByName(host), port) }
val DefaultTimeout = "30s"
@transient lazy val settingBuilder = Settings.builder()
val settings = settingBuilder
.put("cluster.name", ESClusterName)
.put("client.transport.ping_timeout", DefaultTimeout) //The time to wait for a ping response from a node. Defaults to 5s
.build()
}
The PreBuiltTransportClient and Settings.builder() are part of the java API bundled with org.elasticsearch:elasticsearch-5.5.1.jar, org.elasticsearch.client:transport-5.5.1.jar and org.elasticsearch.plugin:transport-netty4-client-5.5.1.jar.
Note the MySettings class was needed to create a serializable wrapper around the org.elasticsearch.common.settings.Settings.Builder object (value of Settings.builder()), which caused object-not-serializable exceptions when a PreBuiltTransportClient was otherwise built within the job_list_RDD.foreachPartition loop.
As it is this code executes with no serialization exceptions and will perform SOME but NOT ALL of the jobs, ie. it does not fully iterate job_list_RDD.
Before iteration completes these two exceptions are thrown (in this order):
[ERROR] 2018-03-21 16:16:58,907 org.apache.spark.util.SparkUncaughtExceptionHandler logError - Uncaught exception in thread Thread[elasticsearch[_client_][generic][T#3],5,main] java.lang.NoSuchMethodError: io.netty.bootstrap.Bootstrap.config()Lio/netty/bootstrap/BootstrapConfig;
[ERROR] 2018-03-21 16:25:50,505 com.thales.sparc.logger.DefaultLog$ generatePlots - org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 38, 192.168.200.105, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
The message in the second exception "Remote RPC client disassociated. Likely due to containers exceeding thresholds" seems to suggest Spark Worker resources are being overrun, which suggests bad management of Elasticsearch client connections, but these messages are the only feedback we can find in the logs.
Oddly, if the line
PartitionClient.close()
(in the first code segment) is commented out, the program completes without exception, however the jobs are still not all completed (job_list_RDD is still not fully iterated). Closing the connection opened for the RDD partition is part of the code pattern explained in Spark Streaming -- foreachRDD.
This same RDD.foreachParition code pattern has worked fine for me when applied to MySQL sessions. Has anyone else tried this with Elasticsearch??
Would a Connection Pool help here? If so, do you have any specific examples of java or scala code for a Connection Pool with Elasticsearch?
thanks ahead of time--
Upvotes: 0
Views: 843
Reputation: 241
It turns out that use of the Netty package by Elasticsearch transport client (org.elasticsearch.plugin:transport-netty4-client-5.5.1.jar) has been known to conflict with the use of Netty by Apache Spark. This is stated by Elasticsearch developer James Baiera (Serializing Elasticsearch clients).
The resolution was to use the Elasticsearch REST client instead of the transport client (Java REST Client). This required updating our Elasticsearch installation, since a full Java REST API was not supported until version 5.6.
No foreachPartition pattern was then necessary, nor was it necessary to explicitly make the REST client serializable (with "extends Serializable"). The working code pattern was:
class RestManager {
val client: RestHighLevelClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("micah", 9200, "http")))
}
object myRestClient extends RestManager
job_list_RDD.foreach(job => {
..
myRestClient.client.search(new SearchRequest(ES_index)
.source(new SearchSourceBuilder.query( .. )))
..
})
Upvotes: 0