Neelam
Neelam

Reputation: 37

unable to insert data into hbase using apache flume

i m using flume to insert log file data to hbase table.but nothing is inserted into table. flume agent is as follows:

 agent1.sources = tail
 agent1.channels = memoryChannel
 agent1.sinks = loggerSink sink1
 agent1.sources.tail.type = exec
agent1.sources.tail.command = tail -f /usr/local/jarsfortest/LogsForTest/generatingLogs-app.logs
agent1.sources.tail.channels = memoryChannel

agent1.sinks.loggerSink.channel = memoryChannel
agent1.sinks.loggerSink.type = logger

agent1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
agent1.sinks.sink1.channel = memoryChannel
agent1.sinks.sink1.table = testFlume
agent1.sinks.sink1.columnFamily = log
agent1.sinks.sink1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#agent1.sinks.sink1.serializer.regex = [a-zA-Z0-9]*[^C][a-zA-Z0-9]*[^C][a-zA-Z0-9]*
agent1.sinks.sink1.serializer.regex =[a-zA-Z0-9]*^C[a-zA-Z0-9]*^C[a-zA-Z0-9]*
agent1.sinks.sink1.serializer.colNames = id, no_fill_reason, bid

agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 1000

above agent is started successfully..but log file data does not inserted into hbase. log file data is as below: id0^COK^C10 i.e. data is control char seperated. Please help me. Thanks in advance.

Upvotes: 0

Views: 750

Answers (2)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29237

reason may be regexpression is not matching to debug this follow the below steps

1) use to start only one agent with this option

flume-ng agent -n $1 -c ../../config/conf/ -f ../../config/conf/$1.conf -Xmx3g **-Xdebug -Xrunjdwp:transport=dt_socket, address=1044,server=y,suspend=y**
  --classpath ../lib/*:../../config/conf/zoo.cfg:../.

once you start the script

listening to 1044...

message will come.

2) create eclipse remote application to connect to your server name and port as 1044

3) getActions method is responsible for putting the rows in to hbase..

Put a break point in getActions method of EventSerializer

public List<Row> getActions() throws FlumeException {
        List actions = Lists.newArrayList();

        Matcher m = this.inputPattern.matcher(new String(this.payload, this.charset));
        if (!(m.matches())) {
            return Lists.newArrayList();
        }

        if (m.groupCount() != this.colNames.size())
            return Lists.newArrayList();
        try {
            byte[] rowKey;
            byte[] rowKey;
            if (this.rowKeyIndex < 0)
                rowKey = getRowKey();
            else {
                rowKey = m.group(this.rowKeyIndex + 1).getBytes(Charsets.UTF_8);
            }
            Put put = new Put(rowKey);

            for (int i = 0; i < this.colNames.size(); ++i) {
                if (i != this.rowKeyIndex) {
                    put.add(this.cf, (byte[]) this.colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
                }
            }
            if (this.depositHeaders) {
                for (Map.Entry entry : this.headers.entrySet()) {
                    put.add(this.cf, ((String) entry.getKey()).getBytes(this.charset),
                                    ((String) entry.getValue()).getBytes(this.charset));
                }
            }
            actions.add(put);
        } catch (Exception e) {
            throw new FlumeException("Could not get row key!", e);
        }
        return actions;
    }

Upvotes: 0

cdhit
cdhit

Reputation: 1454

I can help on the analysis of root cause.

Using CTRL+C to terminate this flume-ng process.

Watch the output carefully. There will be three type of Metrics:

  • Shutdown Metric for type: SOURCE
  • Shutdown Metric for type: CHANNEL
  • Shutdown Metric for type: SINK

For example:

Shutdown Metric for type: SOURCE, name: r1. source.start.time == 1483838106878
Shutdown Metric for type: SOURCE, name: r1. source.stop.time == 1483838118766
Shutdown Metric for type: SOURCE, name: r1. src.append-batch.accepted == 0
Shutdown Metric for type: SOURCE, name: r1. src.append-batch.received == 0
Shutdown Metric for type: SOURCE, name: r1. src.append.accepted == 0
Shutdown Metric for type: SOURCE, name: r1. src.append.received == 0
Shutdown Metric for type: SOURCE, name: r1. src.events.accepted == 141
Shutdown Metric for type: SOURCE, name: r1. src.events.received == 147
Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0

Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1483838106874
Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1483838118789
Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 1000000
Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 141
Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 147
Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 141
Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 31
Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 26

Shutdown Metric for type: SINK, name: k1. sink.start.time == 1483838108891
Shutdown Metric for type: SINK, name: k1. sink.stop.time == 1483838118758
Shutdown Metric for type: SINK, name: k1. sink.batch.complete == 0
Shutdown Metric for type: SINK, name: k1. sink.batch.empty == 4
Shutdown Metric for type: SINK, name: k1. sink.batch.underflow == 1
Shutdown Metric for type: SINK, name: k1. sink.connection.closed.count == 1
Shutdown Metric for type: SINK, name: k1. sink.connection.creation.count == 0
Shutdown Metric for type: SINK, name: k1. sink.connection.failed.count == 0
Shutdown Metric for type: SINK, name: k1. sink.event.drain.attempt == 26
Shutdown Metric for type: SINK, name: k1. sink.event.drain.sucess == 0

In the above example, the issue is during the sink stage because of the drain.success is equal to 0.

Upvotes: 0

Related Questions