Reputation: 21
I am trying to read in a small RCFile (~200 rows of data) into a HashMap to do a Map-Side join, but I having a lot of trouble getting the data in the file into a usable state.
Here is what I have so far, most of which is lifted from this example:
public void configure(JobConf job)
{
try
{
FileSystem fs = FileSystem.get(job);
RCFile.Reader rcFileReader = new RCFile.Reader(fs, new Path("/path/to/file"), job);
int counter = 1;
while (rcFileReader.next(new LongWritable(counter)))
{
System.out.println("Fetching data for row " + counter);
BytesRefArrayWritable dataRead = new BytesRefArrayWritable();
rcFileReader.getCurrentRow(dataRead);
System.out.println("dataRead: " + dataRead + " dataRead.size(): " + dataRead.size());
for (int i = 0; i < dataRead.size(); i++)
{
BytesRefWritable bytesRefRead = dataRead.get(i);
byte b1[] = bytesRefRead.getData();
Text returnData = new Text(b1);
System.out.println("READ-DATA = " + returnData.toString());
}
counter++;
}
}
catch (IOException e)
{
throw new Error(e);
}
}
However, the output that I am getting has all of the data in each column concatenated together in the first row and no data in any of the other rows.
Fetching data for row 1
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@7f26d3df dataRead.size(): 5
READ-DATA = 191606656066860670
READ-DATA = United StatesAmerican SamoaGuamNorthern Mariana Islands
READ-DATA = USASGUMP
READ-DATA = USSouth PacificSouth PacificSouth Pacific
READ-DATA = 19888
Fetching data for row 2
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@1cb1a4e2 dataRead.size(): 0
Fetching data for row 3
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@52c00025 dataRead.size(): 0
Fetching data for row 4
dataRead: org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable@3b49a794 dataRead.size(): 0
How do I read in this data properly so that I have access to one row at a time e.g
(191, United States, US, US, 19)
?
Upvotes: 2
Views: 3497
Reputation: 449
Due to the columnar nature of RCFiles, the row wise read path is significantly different from the write path. We can still use the RCFile.Reader class to read RCFile row-wise (RCFileRecordReader is not needed). But in addition we would need to use a ColumnarSerDe to convert the columnar data to row wise data.
Following is the most simplified code we could get to for reading a RCFile row wise. Please refer to inline code comments for more details.
private static void readRCFileByRow(String pathStr)
throws IOException, SerDeException {
final Configuration conf = new Configuration();
final Properties tbl = new Properties();
/*
* Set the column names and types using comma separated strings.
* The actual name of the columns are not important, as long as the count
* of column is correct.
*
* For types, this example uses strings. byte[] can be stored as string
* by encoding the bytes to ASCII (such as hexString or Base64)
*
* Numbers of columns and number of types must match exactly.
*/
tbl.setProperty("columns", "col1,col2,col3,col4,col5");
tbl.setProperty("columns.types", "string:string:string:string:string");
/*
* We need a ColumnarSerDe to de-serialize the columnar data to row-wise
* data
*/
ColumnarSerDe serDe = new ColumnarSerDe();
serDe.initialize(conf, tbl);
Path path = new Path(pathStr);
FileSystem fs = FileSystem.get(conf);
final RCFile.Reader reader = new RCFile.Reader(fs, path, conf);
final LongWritable key = new LongWritable();
final BytesRefArrayWritable cols = new BytesRefArrayWritable();
while (reader.next(key)) {
System.out.println("Getting next row.");
/*
* IMPORTANT: Pass the same cols object to the getCurrentRow API; do not
* create new BytesRefArrayWritable() each time. This is because one call
* to getCurrentRow(cols) can potentially read more than one column
* values which the serde below would take care to read one by one.
*/
reader.getCurrentRow(cols);
final ColumnarStruct row = (ColumnarStruct) serDe.deserialize(cols);
final ArrayList<Object> objects = row.getFieldsAsList();
for (final Object object : objects) {
// Lazy decompression happens here
final String payload =
((LazyString) object).getWritableObject().toString();
System.out.println("Value:" + payload);
}
}
}
In this code, the getCourrentRow still reads the data column wise and we need to use a SerDe to convert it to row. Also, calling getCurrentRow()
does not mean that all the fields in the row have been decompressed. Actually, according to lazy decompression, a column will not be decompressed until one of its field is being deserialized. For this, we have used coulmnarStruct.getFieldsAsList()
to get a list of references to the lazy objects. The actual reading happens in the getWritableObject()
call on the LazyString reference.
Another way of achiving the same thing would be to use StructObjectInspector
and use the copyToStandardObject
API. But I find the above method simpler.
Upvotes: 1
Reputation: 21
After some more digging, I've found a solution. The key here is to not use RCFile.Reader
but to use RCFileRecordReader
.
Here is what I ended up with, adapted to open multiple files as well:
try
{
FileSystem fs = FileSystem.get(job);
FileStatus [] fileStatuses = fs.listStatus(new Path("/path/to/dir/"));
LongWritable key = new LongWritable();
BytesRefArrayWritable value = new BytesRefArrayWritable();
int counter = 1;
for (int i = 0; i < fileStatuses.length; i++)
{
FileStatus fileStatus = fileStatuses[i];
if (!fileStatus.isDir())
{
System.out.println("File: " + fileStatus);
FileSplit split = new FileSplit(fileStatus.getPath(), 0, fileStatus.getLen(), job);
RCFileRecordReader reader = new RCFileRecordReader(job, split);
while (reader.next(key, value))
{
System.out.println("Getting row " + counter);
AllCountriesRow acr = AllCountriesRow.valueOf(value);
System.out.println("ROW: " + acr);
counter++;
}
}
}
}
catch (IOException e)
{
throw new Error(e);
}
And AllCountryiesRow.valueOf:
(note that Column
is an enum of the columns in the order that they appear in each row and serDe
is an instance of ColumnarSerDe
)
public static AllCountriesRow valueOf(BytesRefArrayWritable braw) throws IOException
{
try
{
StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector();
Object row = serDe.deserialize(braw);
List<? extends StructField> fieldRefs = soi.getAllStructFieldRefs();
Object fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.ID.ordinal()));
ObjectInspector oi = fieldRefs.get(Column.ID.ordinal()).getFieldObjectInspector();
int id = ((IntObjectInspector)oi).get(fieldData);
fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.NAME.ordinal()));
oi = fieldRefs.get(Column.NAME.ordinal()).getFieldObjectInspector();
String name = ((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);
fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.CODE.ordinal()));
oi = fieldRefs.get(Column.CODE.ordinal()).getFieldObjectInspector();
String code = ((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);
fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.REGION_NAME.ordinal()));
oi = fieldRefs.get(Column.REGION_NAME.ordinal()).getFieldObjectInspector();
String regionName = ((StringObjectInspector)oi).getPrimitiveJavaObject(fieldData);
fieldData = soi.getStructFieldData(row, fieldRefs.get(Column.CONTINENT_ID.ordinal()));
oi = fieldRefs.get(Column.CONTINENT_ID.ordinal()).getFieldObjectInspector();
int continentId = ((IntObjectInspector)oi).get(fieldData);
return new AllCountriesRow(id, name, code, regionName, continentId);
}
catch (SerDeException e)
{
throw new IOException(e);
}
}
This ends up with an AllCountriesRow object that has all the information of the relevant row in it.
Upvotes: 0