Reputation: 37
i m using flume to insert log file data to hbase table.but nothing is inserted into table. flume agent is as follows:
agent1.sources = tail
agent1.channels = memoryChannel
agent1.sinks = loggerSink sink1
agent1.sources.tail.type = exec
agent1.sources.tail.command = tail -f /usr/local/jarsfortest/LogsForTest/generatingLogs-app.logs
agent1.sources.tail.channels = memoryChannel
agent1.sinks.loggerSink.channel = memoryChannel
agent1.sinks.loggerSink.type = logger
agent1.sinks.sink1.type = org.apache.flume.sink.hbase.HBaseSink
agent1.sinks.sink1.channel = memoryChannel
agent1.sinks.sink1.table = testFlume
agent1.sinks.sink1.columnFamily = log
agent1.sinks.sink1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#agent1.sinks.sink1.serializer.regex = [a-zA-Z0-9]*[^C][a-zA-Z0-9]*[^C][a-zA-Z0-9]*
agent1.sinks.sink1.serializer.regex =[a-zA-Z0-9]*^C[a-zA-Z0-9]*^C[a-zA-Z0-9]*
agent1.sinks.sink1.serializer.colNames = id, no_fill_reason, bid
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 1000
above agent is started successfully..but log file data does not inserted into hbase. log file data is as below: id0^COK^C10 i.e. data is control char seperated. Please help me. Thanks in advance.
Upvotes: 0
Views: 750
Reputation: 29237
reason may be regexpression is not matching to debug this follow the below steps
1) use to start only one agent with this option
flume-ng agent -n $1 -c ../../config/conf/ -f ../../config/conf/$1.conf -Xmx3g **-Xdebug -Xrunjdwp:transport=dt_socket, address=1044,server=y,suspend=y**
--classpath ../lib/*:../../config/conf/zoo.cfg:../.
once you start the script
listening to 1044...
message will come.
2) create eclipse remote application to connect to your server name and port as 1044
3) getActions
method is responsible for putting the rows in to hbase..
Put a break point in getActions
method of EventSerializer
public List<Row> getActions() throws FlumeException {
List actions = Lists.newArrayList();
Matcher m = this.inputPattern.matcher(new String(this.payload, this.charset));
if (!(m.matches())) {
return Lists.newArrayList();
}
if (m.groupCount() != this.colNames.size())
return Lists.newArrayList();
try {
byte[] rowKey;
byte[] rowKey;
if (this.rowKeyIndex < 0)
rowKey = getRowKey();
else {
rowKey = m.group(this.rowKeyIndex + 1).getBytes(Charsets.UTF_8);
}
Put put = new Put(rowKey);
for (int i = 0; i < this.colNames.size(); ++i) {
if (i != this.rowKeyIndex) {
put.add(this.cf, (byte[]) this.colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
}
}
if (this.depositHeaders) {
for (Map.Entry entry : this.headers.entrySet()) {
put.add(this.cf, ((String) entry.getKey()).getBytes(this.charset),
((String) entry.getValue()).getBytes(this.charset));
}
}
actions.add(put);
} catch (Exception e) {
throw new FlumeException("Could not get row key!", e);
}
return actions;
}
Upvotes: 0
Reputation: 1454
I can help on the analysis of root cause.
Using CTRL+C to terminate this flume-ng process
.
Watch the output carefully. There will be three type of Metrics:
For example:
Shutdown Metric for type: SOURCE, name: r1. source.start.time == 1483838106878
Shutdown Metric for type: SOURCE, name: r1. source.stop.time == 1483838118766
Shutdown Metric for type: SOURCE, name: r1. src.append-batch.accepted == 0
Shutdown Metric for type: SOURCE, name: r1. src.append-batch.received == 0
Shutdown Metric for type: SOURCE, name: r1. src.append.accepted == 0
Shutdown Metric for type: SOURCE, name: r1. src.append.received == 0
Shutdown Metric for type: SOURCE, name: r1. src.events.accepted == 141
Shutdown Metric for type: SOURCE, name: r1. src.events.received == 147
Shutdown Metric for type: SOURCE, name: r1. src.open-connection.count == 0
Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1483838106874
Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1483838118789
Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 1000000
Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 141
Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 147
Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 141
Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 31
Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 26
Shutdown Metric for type: SINK, name: k1. sink.start.time == 1483838108891
Shutdown Metric for type: SINK, name: k1. sink.stop.time == 1483838118758
Shutdown Metric for type: SINK, name: k1. sink.batch.complete == 0
Shutdown Metric for type: SINK, name: k1. sink.batch.empty == 4
Shutdown Metric for type: SINK, name: k1. sink.batch.underflow == 1
Shutdown Metric for type: SINK, name: k1. sink.connection.closed.count == 1
Shutdown Metric for type: SINK, name: k1. sink.connection.creation.count == 0
Shutdown Metric for type: SINK, name: k1. sink.connection.failed.count == 0
Shutdown Metric for type: SINK, name: k1. sink.event.drain.attempt == 26
Shutdown Metric for type: SINK, name: k1. sink.event.drain.sucess == 0
In the above example, the issue is during the sink stage because of the drain.success
is equal to 0.
Upvotes: 0