Abdurrahman Adebiyi
Abdurrahman Adebiyi

Reputation: 510

flume-kite-morphline: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input: expected close marker for OBJECT

While working on flume (1.6& 1.7) I am experiencing the below error

2016-12-02 00:57:11,634 (pool-3-thread-1) [WARN - org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:143)] Line length exceeds max (2048), truncating line!
2016-12-02 00:57:11,777 (pool-3-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:280)] FATAL: Spool Directory source r2: { spoolDir: /home/h/flume/forex }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
org.kitesdk.morphline.api.MorphlineRuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input within/between OBJECT entries
 at [Source: java.io.ByteArrayInputStream@319bed83; line: 1, column: 4097]
    at org.kitesdk.morphline.base.FaultTolerance.handleException(FaultTolerance.java:73)
    at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.process(MorphlineHandlerImpl.java:136)
    at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.intercept(MorphlineInterceptor.java:163)
    at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.intercept(MorphlineInterceptor.java:152)
    at org.apache.flume.sink.solr.morphline.MorphlineInterceptor.intercept(MorphlineInterceptor.java:74)
    at org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62)
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:148)
    at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:258)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.kitesdk.morphline.api.MorphlineRuntimeException: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input within/between OBJECT entries
 at [Source: java.io.ByteArrayInputStream@319bed83; line: 1, column: 4097]
    at org.kitesdk.morphline.stdio.AbstractParser.doProcess(AbstractParser.java:98)
    at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:161)
    at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:186)
    at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:161)
    at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.process(MorphlineHandlerImpl.java:130)
    ... 13 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input within/between OBJECT entries
 at [Source: java.io.ByteArrayInputStream@319bed83; line: 1, column: 4097]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWS(UTF8StreamJsonParser.java:2547)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:709)
    at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:217)
    at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:63)
    at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:14)
    at com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:189)
    at org.kitesdk.morphline.json.ReadJsonBuilder$ReadJson.doProcess(ReadJsonBuilder.java:110)
    at org.kitesdk.morphline.stdio.AbstractParser.doProcess(AbstractParser.java:96)
    ... 17 more

My agent pipeline is setup to have ingested json data from a spooldir source , extracted and transformed using a morphline interceptor.

See excerpts of flume config below

#GENERIC
a1.channels = mem-channel
a1.sources = r2
a1.sinks = k2 k3 

#CHANNEL
a1.channels.mem-channel.type = memory
a1.channels.mem-channel.capacity = 10000000
a1.channels.mem-channel.transactionCapacity = 1000

#SRC
a1.sources.r2.type = spooldir 
a1.sources.r2.channels = mem-channel
a1.sources.r2.spoolDir = /path/to/some/directory
a1.sources.r2.interceptors = morphline

#INTERCEPTOR
a1.sources.r2.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.r2.interceptors.morphline.morphlineFile =/some/morphline/forex.conf
a1.sources.r2.interceptors.morphline.morphlineId = convertJsonToCSV

#SINK
a1.sinks.k2type = logger
a1.sinks.k2.channel = mem-channel

a1.sinks.k3.type = file_roll
a1.sinks.k3.channel = mem-channel
a1.sinks.k3.sink.directory = /tmp/flume
a1.sinks.k3.batchSize = 1

Upvotes: 1

Views: 986

Answers (1)

Abdurrahman Adebiyi
Abdurrahman Adebiyi

Reputation: 510

RCA

The answer was staring at me all the while.The issue was not the morphline configuration nor the input son file. Closer look at the flume WARNING line

[WARN - org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:143)] Line length exceeds max (2048), truncating line!

gave good direction to the solution.By default spooldir source uses the deserialiser.LINE.It has a parameter deserializer.maxLineLength with default value 2048. From the Flume User Guide , this parameter is defined as

Maximum number of characters to include in a single event. If a line exceeds this length, it is truncated, and the remaining characters on the line will appear in a subsequent event

The JSON objects could not be parsed as the data content was truncated on spooling processing which made it incomplete for the morphline to process the JSON byte array read.

Solution

I increased the value of deserializer.maxLineLength to 10000 (to be safe as my number of character from the son file could be large in future).

Upvotes: 1

Related Questions