Reputation: 510
While working on flume (1.6& 1.7) I am experiencing the below error
2016-12-02 00:57:11,634 (pool-3-thread-1) [WARN - org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:143)] Line length exceeds max (2048), truncating line!
2016-12-02 00:57:11,777 (pool-3-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:280)] FATAL: Spool Directory source r2: { spoolDir: /home/h/flume/forex }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
org.kitesdk.morphline.api.MorphlineRuntimeException: org.kitesdk.morphline.api.MorphlineRuntimeException: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input within/between OBJECT entries
at [Source: java.io.ByteArrayInputStream@319bed83; line: 1, column: 4097]
at org.kitesdk.morphline.base.FaultTolerance.handleException(FaultTolerance.java:73)
at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.process(MorphlineHandlerImpl.java:136)
at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.intercept(MorphlineInterceptor.java:163)
at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.intercept(MorphlineInterceptor.java:152)
at org.apache.flume.sink.solr.morphline.MorphlineInterceptor.intercept(MorphlineInterceptor.java:74)
at org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:148)
at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:258)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.kitesdk.morphline.api.MorphlineRuntimeException: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input within/between OBJECT entries
at [Source: java.io.ByteArrayInputStream@319bed83; line: 1, column: 4097]
at org.kitesdk.morphline.stdio.AbstractParser.doProcess(AbstractParser.java:98)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:161)
at org.kitesdk.morphline.base.AbstractCommand.doProcess(AbstractCommand.java:186)
at org.kitesdk.morphline.base.AbstractCommand.process(AbstractCommand.java:161)
at org.apache.flume.sink.solr.morphline.MorphlineHandlerImpl.process(MorphlineHandlerImpl.java:130)
... 13 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input within/between OBJECT entries
at [Source: java.io.ByteArrayInputStream@319bed83; line: 1, column: 4097]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWS(UTF8StreamJsonParser.java:2547)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:709)
at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:217)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:63)
at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:14)
at com.fasterxml.jackson.databind.MappingIterator.nextValue(MappingIterator.java:189)
at org.kitesdk.morphline.json.ReadJsonBuilder$ReadJson.doProcess(ReadJsonBuilder.java:110)
at org.kitesdk.morphline.stdio.AbstractParser.doProcess(AbstractParser.java:96)
... 17 more
My agent pipeline is setup to have ingested json data from a spooldir source , extracted and transformed using a morphline interceptor.
See excerpts of flume config below
#GENERIC
a1.channels = mem-channel
a1.sources = r2
a1.sinks = k2 k3
#CHANNEL
a1.channels.mem-channel.type = memory
a1.channels.mem-channel.capacity = 10000000
a1.channels.mem-channel.transactionCapacity = 1000
#SRC
a1.sources.r2.type = spooldir
a1.sources.r2.channels = mem-channel
a1.sources.r2.spoolDir = /path/to/some/directory
a1.sources.r2.interceptors = morphline
#INTERCEPTOR
a1.sources.r2.interceptors.morphline.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.r2.interceptors.morphline.morphlineFile =/some/morphline/forex.conf
a1.sources.r2.interceptors.morphline.morphlineId = convertJsonToCSV
#SINK
a1.sinks.k2type = logger
a1.sinks.k2.channel = mem-channel
a1.sinks.k3.type = file_roll
a1.sinks.k3.channel = mem-channel
a1.sinks.k3.sink.directory = /tmp/flume
a1.sinks.k3.batchSize = 1
Upvotes: 1
Views: 986
Reputation: 510
RCA
The answer was staring at me all the while.The issue was not the morphline configuration nor the input son file. Closer look at the flume WARNING line
[WARN - org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:143)] Line length exceeds max (2048), truncating line!
gave good direction to the solution.By default spooldir source uses the deserialiser.LINE
.It has a parameter deserializer.maxLineLength
with default value 2048. From the Flume User Guide , this parameter is defined as
Maximum number of characters to include in a single event. If a line exceeds this length, it is truncated, and the remaining characters on the line will appear in a subsequent event
The JSON objects could not be parsed as the data content was truncated on spooling processing which made it incomplete for the morphline to process the JSON byte array read.
Solution
I increased the value of deserializer.maxLineLength
to 10000 (to be safe as my number of character from the son file could be large in future).
Upvotes: 1