petroslamb
petroslamb

Reputation: 605

Elasticsearch-Hadoop formatting multi resource writes issue

I am interfacing Elasticsearch with Spark, using the Elasticsearch-Hadoop plugin and I am having difficulty writing a dataframe with a timestamp type column to Elasticsearch.

The problem is when I try to write using dynamic/multi resource formatting to create a daily index.

From the relevant documentation I get the impression that this is possible, however, the python example below fails to run unless I change my dataframe type to date.

import pyspark
conf = pyspark.SparkConf()
conf.set('spark.jars', 'elasticsearch-spark-20_2.11-6.1.2.jar')
conf.set('es.nodes', '127.0.0.1:9200')
conf.set('es.read.metadata', 'true')
conf.set('es.nodes.wan.only', 'true')
from datetime import datetime, timedelta
now = datetime.now()
before = now - timedelta(days=1)
after = now + timedelta(days=1)
cols = ['idz', 'name', 'time']
vals = [(0,'maria', before), (1, 'lolis', after)]  
time_df = spark.createDataFrame(vals, cols)

When I try to write, I use the following:

time_df.write.mode('append').format(
    'org.elasticsearch.spark.sql'
).options(
    **{'es.write.operation': 'index' }
).save('xxx-{time|yyyy.MM.dd}/1')

Unfortunatelly this renders an error:

.... Caused by: java.lang.IllegalArgumentException: Invalid format: "2018-03-04 12:36:12.949897" is malformed at " 12:36:12.949897" at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)

On the other hand this works perfectly fine if I use dates when I create my dataframe:

cols = ['idz', 'name', 'time']
vals = [(0,'maria', before.date()), (1, 'lolis', after.date())]  
time_df = spark.createDataFrame(vals, cols)

Is it possible to format a dataframe timestamp to be written to daily indexes with this method, without also keeping a date column around? How about monthly indexes?

Pyspark version: spark version 2.2.1 Using Scala version 2.11.8, OpenJDK 64-Bit Server VM, 1.8.0_151

ElasticSearch version number "6.2.2" build_hash "10b1edd" build_date "2018-02-16T19:01:30.685723Z" build_snapshot false lucene_version "7.2.1" minimum_wire_compatibility_version "5.6.0" minimum_index_compatibility_version "5.0.0"

Upvotes: 1

Views: 213

Answers (0)

Related Questions