Shimon Benattar
Shimon Benattar

Reputation: 173

Filtering log files in Flume using interceptors

I have an http server writing log files which I then load into HDFS using Flume First I want to filter data according to data I have in my header or body. I read that I can do this using an interceptor with regex, can someone explain exactly what I need to do? Do I need to write Java code that overrides the Flume code?

Also I would like to take data and according to the header send it to a different sink (i.e source=1 goes to sink1 and source=2 goes to sink2) how is this done?

thank you,

Shimon

Upvotes: 3

Views: 11137

Answers (2)

Yanpeng
Yanpeng

Reputation: 91

You can use flume channel selectors for simply routing event to different destinations. Or you can chain several flume agents together to implement complex routing function. But the chained flume agents will become a little hard to maintain (resource usage and flume topology). You can have a look at flume-ng router sink, it may provide some function you want.

First, add specific fields in event header by flume interceptor

a1.sources = r1 r2
a1.channels = c1 c2
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = datacenter
a1.sources.r1.interceptors.i1.value = NEW_YORK
a1.sources.r2.channels =  c2
a1.sources.r2.type = seq
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = datacenter
a1.sources.r2.interceptors.i2.value = BERKELEY

Then, you can setup your flume channel selector like:

a2.sources = r2
a2.sources.channels = c1 c2 c3 c4
a2.sources.r2.selector.type = multiplexing
a2.sources.r2.selector.header = datacenter
a2.sources.r2.selector.mapping.NEW_YORK = c1
a2.sources.r2.selector.mapping.BERKELEY= c2 c3
a2.sources.r2.selector.default = c4

Or, you can setup avro-router sink like:

agent.sinks.routerSink.type = com.datums.stream.AvroRouterSink
agent.sinks.routerSink.hostname = test_host
agent.sinks.routerSink.port = 34541
agent.sinks.routerSink.channel = memoryChannel

# Set sink name
agent.sinks.routerSink.component.name = AvroRouterSink

# Set header name for routing
agent.sinks.routerSink.condition = datacenter

# Set routing conditions
agent.sinks.routerSink.conditions = east,west
agent.sinks.routerSink.conditions.east.if = ^NEW_YORK
agent.sinks.routerSink.conditions.east.then.hostname = east_host
agent.sinks.routerSink.conditions.east.then.port = 34542
agent.sinks.routerSink.conditions.west.if = ^BERKELEY
agent.sinks.routerSink.conditions.west.then.hostname = west_host
agent.sinks.routerSink.conditions.west.then.port = 34543

Upvotes: 0

Dmitry
Dmitry

Reputation: 2993

You don't need to write Java code to filter events. Use Regex Filtering Interceptor to filter events which body text matches some regular expression:

agent.sources.logs_source.interceptors = regex_filter_interceptor
agent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filter
agent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex>
agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true

To route events based on headers use Multiplexing Channel Selector:

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

Here events with header "state"="CZ" go to channel "c1", with "state"="US" - to "c2" and "c3", all other - to "c4".

This way you can also filter events by header - just route specific header value to channel, which points to Null Sink.

Upvotes: 11

Related Questions