Filtering log files in Flume using interceptors Filtering log files in Flume using interceptors hadoop hadoop

Filtering log files in Flume using interceptors


You don't need to write Java code to filter events. Use Regex Filtering Interceptor to filter events which body text matches some regular expression:

agent.sources.logs_source.interceptors = regex_filter_interceptoragent.sources.logs_source.interceptors.regex_filter_interceptor.type = regex_filteragent.sources.logs_source.interceptors.regex_filter_interceptor.regex = <your regex>agent.sources.logs_source.interceptors.regex_filter_interceptor.excludeEvents = true

To route events based on headers use Multiplexing Channel Selector:

a1.sources = r1a1.channels = c1 c2 c3 c4a1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = statea1.sources.r1.selector.mapping.CZ = c1a1.sources.r1.selector.mapping.US = c2 c3a1.sources.r1.selector.default = c4

Here events with header "state"="CZ" go to channel "c1", with "state"="US" - to "c2" and "c3", all other - to "c4".

This way you can also filter events by header - just route specific header value to channel, which points to Null Sink.


You can use flume channel selectors for simply routing event to different destinations. Or you can chain several flume agents together to implement complex routing function.But the chained flume agents will become a little hard to maintain (resource usage and flume topology).You can have a look at flume-ng router sink, it may provide some function you want.

First, add specific fields in event header by flume interceptor

a1.sources = r1 r2a1.channels = c1 c2a1.sources.r1.channels =  c1a1.sources.r1.type = seqa1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = datacentera1.sources.r1.interceptors.i1.value = NEW_YORKa1.sources.r2.channels =  c2a1.sources.r2.type = seqa1.sources.r2.interceptors = i2a1.sources.r2.interceptors.i2.type = statica1.sources.r2.interceptors.i2.key = datacentera1.sources.r2.interceptors.i2.value = BERKELEY

Then, you can setup your flume channel selector like:

a2.sources = r2a2.sources.channels = c1 c2 c3 c4a2.sources.r2.selector.type = multiplexinga2.sources.r2.selector.header = datacentera2.sources.r2.selector.mapping.NEW_YORK = c1a2.sources.r2.selector.mapping.BERKELEY= c2 c3a2.sources.r2.selector.default = c4

Or, you can setup avro-router sink like:

agent.sinks.routerSink.type = com.datums.stream.AvroRouterSinkagent.sinks.routerSink.hostname = test_hostagent.sinks.routerSink.port = 34541agent.sinks.routerSink.channel = memoryChannel# Set sink nameagent.sinks.routerSink.component.name = AvroRouterSink# Set header name for routingagent.sinks.routerSink.condition = datacenter# Set routing conditionsagent.sinks.routerSink.conditions = east,westagent.sinks.routerSink.conditions.east.if = ^NEW_YORKagent.sinks.routerSink.conditions.east.then.hostname = east_hostagent.sinks.routerSink.conditions.east.then.port = 34542agent.sinks.routerSink.conditions.west.if = ^BERKELEYagent.sinks.routerSink.conditions.west.then.hostname = west_hostagent.sinks.routerSink.conditions.west.then.port = 34543