Reputation: 247
I have a simple bolt that reads data from a kafka spout and should then write the data to an HDFS direcory. The problem is that the bolt does not write until the cluster is stopped. How can I ensure that the as the bolt reads a tuple from the kafka spout that it then immediately writes it to HDFS or, at least, writes every 'n' entries. (I am using CDH 4.4, Hadoop 2.0)
The java for the bolt:
public class PrinterBolt10 extends BaseRichBolt{
private OutputCollector collector;
private String values;
Configuration configuration = null;
FileSystem hdfs = null;
FSDataOutputStream outputStream=null;
BufferedWriter br = null;
List<String> valList;
String machineValue;
int upTime;
int downTime;
int idleTime;
public void prepare(Map config, TopologyContext context,OutputCollector collector) {
upTime=0;
downTime=0;
idleTime=0;
this.collector = collector;
String timeStamp = new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime());
try{
configuration = new Configuration();
configuration.set("fs.defaultFS", "hdfs://localhost.localdomain:8020");
hdfs =FileSystem.get(configuration);
outputStream = hdfs.create(new Path("/tmp/storm/StormHdfs/machine10_"+timeStamp+".txt"));
br = new BufferedWriter( new OutputStreamWriter( outputStream , "UTF-8" ) );
br.flush();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void execute(Tuple tuple) {
values = tuple.toString();
int start = values.indexOf('[');
int end = values.indexOf(']');
machineValue=values.substring(start+1,end);
String machine=machineValue.substring(0,machineValue.indexOf(','));
String code = machineValue.substring(machineValue.indexOf(',')+1);
int codeInt = Integer.parseInt(code);
if(codeInt==0) idleTime+=30;
elseif(codeInt==1) upTime+=30;
else downTime+=30;
String finalMessage = machine + " "+ "upTime(s) :" + upTime+" "+ "idleTime(s): "+idleTime+" "+"downTime: "+downTime;
try {
br.write(finalMessage); // *This is the writing part into HDFS*
br.write('\n');
br.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
public void cleanup() {}
}
Upvotes: 3
Views: 3705
Reputation: 17
You should use HdfsBolt to insert data into HDFS. Use configurations as described by the author. Instead of giving SyncPolicy count as 1000 you should give it to some minimum number (e.g. 10-20) for testing purpose. Because that number represents after how many number of tuples that spout has emitted those tuples should be written on HDFS. For example, if you configured
SyncPolicy syncPolicy = new CountSyncPolicy(10);
then you will be able to see data you have inserted into Kafka after 10 messages.
Upvotes: -1
Reputation: 5541
EDIT: completely changed my answer.
You need to use the HdfsBolt
rather than relying on writing to file yourself. Using the HdfsBolt
takes away all of the complication of working out when to flush to files, opening buffer streams etc. See http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.3/bk_user-guide/content/ch_storm-using-hdfs-connector.html, but the bits you're interested in are:
// Use pipe as record boundary
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");
//Synchronize data buffer with the filesystem every 1000 tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// Rotate data files when they reach five MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/foo");
// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
.withFsURL("hdfs://localhost:54310")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
Then simply pass your data from your current bolt into this one.
Upvotes: 3