azdatasci
azdatasci

Reputation: 841

Getting Elasticsearch Data into HDFS Easily

We have had an Elasticsearch cluster on premise for almost 2 years now and wanted to do some more advanced analytics and such with the log data contained there as well as other disparate data sources.

Our focus is Syslogs which are in Elasticsearch. Each day generates ~100gb of syslog data - each day is it's own index. We have some applicaiton logs too, but if I can solve this problem for syslog, I can easily solve it for other data movement issues.

Which leads me to my question. For my analysis, we are using Spark 2.1.1 with the Python API. I'd like to have all of the syslog data, for say, 2 weeks in HDFS so we can do two things:

  1. Avoid latency with communicating between our Spark/Hadoop Cluster
  2. Speed up things on our machine learning jobs
  3. Down the road I want to start using Parquet for my data, so if I have the data being pulled from ES, I can do whatever I want to with it later.

Now, my question - what is the best method for pulling such large amounts of data from ES and putting it in HDFS? I have an example in PySpark of doing some basic queries, but when I try and pull an entire index (100gb daily generated index) into an RDD, I get out of memory errors. I have reached out to Elasticsearch support, but am being told this is a problem I need to solve on the Hadoop/Spark side and they don't support that.

We have setup the "ES-Hadoop Connector", which does give me some framework to work from, though understanding the documentation is really a challenge. There are connectors for several of the components of the Hadoop ecosystem (HIVE, Spark, Hadoop, etc.). I'm not sure if there is a solution there, or if there is something better to do. I'm new to this, so excuse any questions that have obvious answers. I'm looking for some guidance and some specific recommendations (pointers to specific examples with setup and code would be amazing, if possible). My goals for this are:

  1. Get ~2 weeks worth of syslogs in HDFS (I'd like this to be a rolling 2 weeks)
  2. Create a minimal load on the Elasticsearch system
  3. Whatever methods there are, it would be nice to automate this so every day a new index is ingested and the oldest is removed. This is not a hard requirement, but just a nice to have.

Thanks for any help, suggestions or examples you can point me toward.

EDIT / ADDITIONAL INFO:

I wanted to add some code here to explain what I am trying to do. The process is taking an extremely long time to complete, and even after hours, doesn't show an progress, so I'm wondering if I am doing something wrong.

Here is how I launch Py Spark:

pyspark --jars=/sysadmin/hadoop/elasticsearch-hadoop-5.6.3/dist/elasticsearch-hadoop-5.6.3.jar --master yarn --deploy-mode client --num-executors 10 --executor-cores 4 --executor-memory 8G --driver-memory 50G

Then, I do a couple of things, I set the esconf, create the RDD and then try and just save it as text to HDFS:

>>> esconf = {"es.net.http.auth.user":"XXXXX","es.net.http.auth.pass":"XXXXX","es.resource":"logstash-syslog-2017.10.11", "es.query":"?q=*","es.read.field.include":"message","es.nodes":"server0005","es.net.ssl":"true"}
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=esconf)
>>> rdd.saveAsTextFile("/user/spark/logstash-syslog-2017.10.11.txt") 

Now, the RDD comes back and if I do a take(1) from the RDD, it takes a while, but I can get back the top 10 results. On that 10 record set, I can save it, works like a charm. On the full RDD, though, this is just taking forever. I'm not really sure what I should expect, but I can't imagine on a 10-node cluster with 64gb of RAM and 8 cores per box that this should take hours.

Upvotes: 4

Views: 2891

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191701

I have an example in PySpark of doing some basic queries, but when I try and pull an entire index (100gb daily generated index) into an RDD, I get out of memory errors

Spark doesn't allocate much memory to your jobs by default, so yes, when dealing with that much data, you'll get OOM errors.

Here's the key properties you should be concerned with and their defaults.

  • spark.dynamicAllocation.enabled - false
  • spark.executor.instances - 2
  • spark.executor.memory - 1g
  • spark.driver.cores - 1

If your Spark jobs are running under YARN cluster management, you also need to consider your YARN container sizes. When running in cluster mode, the Application Master will be the Spark driver container. In my experience, unless your Spark code is calling collect() to send data back through the driver, it doesn't need that much memory itself.

I would try first increasing the Executor memory, and then the number of executors. If you enable dynamic allocation, then you can consider not specifying the executor amount, but it does set a lower boundary to start with.

ES-Hadoop provides many connectors to extract data, but it all comes down to preference. If you know SQL, use Hive. Pig is simpler to run than Spark. Spark is very memory heavy, which might not work well in some clusters.

You mention NiFi in your comments, but that is still a Java process, and prone to OOM errors. Unless you have a NiFi cluster, you'll have a single process somewhere pulling 100 GB through a FlowFile on disk before writing to HDFS.

If you need a Snapshot of a whole Index, Elasticsearch provides HDFS support for such a feature. I'm unsure what data format that is, though, or if Hadoop processes can read it.

Upvotes: 1

Related Questions