Saturday, January 7, 2012

RCFile: A Fast and Space-efficient Data Placement

RCFile (Record Columnar File) is an efficient mechanism for storing and faster retrieval of huge amount of Data sets.
It is basically a concept developed at Facebook to overcome the challenge’s exposed by the Large Data sets.

Without saying that every BigData use case involve the solution for 3 critical problems (You can also say NFR's of BigData solution's)

1) Fast data loading - Typical use cases involves the loading of TB of data for Analytics, so it is highly desirable that an efficient mechanism should be used for so that overheads can be reduced and data can be loaded in the minimum possible time.

2) Fast query processing - Time take for processing any query is vital for any kind of analytics, which to a big extent depends on the way the data is being stored and partitioned.

3) Highly efficient storage space utilization - Data is growing or in BigData we usually say "Data Explosion". Definitely the various type of Compression needs to be considered so that the space can be used efficiently.

RCFile seems to be solving much of above problems.

Typical RDBMS divides the partitions the table row wise and Column oriented Database divides the tables into column wise partitions.

RCFile employs the benefits of both by partitioning the data row wise than column wise too.

Below Diagram shows the way a typical RDBMS Datafile is handled within the RCFile.




API of RCFile is provided in Hive and facilitates 2 different ways of the implementation: -

1. It can be used in M/R jobs by extending RCFileOutputFormat, RCFileInputFormat and RCFileRecordReader
2. Reader and Writer which can be used by the applications for reading and writing the data to RC files in their own Way

Here is a small example for using RCFile Reader and Writers to write and read the data in 2 different ways (Column wise and Row Wise)

Pre-requisites for Running this Example: -

1. Hadoop 0.20.* should be installed and running
2. Hive 0.6+ should be in classpath

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;

public class TestRCSimpleReadWrite /*extends RCFileCat */  /* USE RCFileCat only if this programme will be invoking a M/R Job, basically it extends Tool*/ {

 Configuration conf;
 FileSystem fs;
 private static final int maxHiveColumns = 5;
 

 /**
  * Constructor
  */
 TestRCSimpleReadWrite() {
  try {
   System.setProperty("HADOOP_HOME", "D:\\myWork\\hadoop\\hadoop-0.20.2");
   conf = new Configuration();
   conf.addResource(new Path("D:\\myWork\\hadoop\\hadoop-0.20.2\\conf\\core-site.xml"));
   conf.addResource(new Path("D:\\myWork\\hadoop\\hadoop-0.20.2\\conf\\hdfs-site.xml"));
   conf.set("hive.io.rcfile.column.number.conf", String.valueOf(maxHiveColumns));
   fs = FileSystem.get(conf);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 /**
  * Writing Data to a file
  */
 public void writeRCData() {
  try {

   RCFile.Writer rcFileWriter = new RCFile.Writer(fs, conf, new Path("sumit/rctext"), null, new Metadata(), new BZip2Codec());
   // No of Rows........
   for (int j = 0; j < 10; j++) {
    BytesRefArrayWritable dataWrite = new BytesRefArrayWritable(10);
    // Number of Column in Each Row......
    for (int i = 0; i < maxHiveColumns; i++) {
     Text coulmn1 = new Text("ROW-NUM - " + j + ", COLUMN-NUM = " + i + "\n");
     BytesRefWritable bytesRefWritable = new BytesRefWritable();
     bytesRefWritable.set(coulmn1.getBytes(), 0, coulmn1.getLength());
     // ensure the if required the capacity is increased
     dataWrite.resetValid(i);
     dataWrite.set(i, bytesRefWritable);

    }
    rcFileWriter.append(dataWrite);
   }

   rcFileWriter.close();

  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 /*
  * Reading Column Wise Data
  */
 public void readColumnWiseRCFileData() {
  try {
   RCFile.Reader rcFileReader = new RCFile.Reader(fs, new Path("sumit/rctext"), conf);
   int counter = 1;
   // Getting the Chunk of Row Groups...
   while (rcFileReader.nextColumnsBatch()) {
    System.out.println("READ COLUMN WISE - we are getting some data");

    // Iterate over all Rows fetched and iterate over each column
    for (int i = 0; i < maxHiveColumns; i++) {

     BytesRefArrayWritable dataRead = rcFileReader.getColumn(i, null);
     // rcFileReader.getCurrentRow(dataRead);
     for (int j = 0; j < dataRead.size() - 1; j++) {
      BytesRefWritable bytesRefread = dataRead.get(j);
      byte b1[] = bytesRefread.getData();
      Text returnData = new Text(b1);
      // This will PRINT the Data for the existing Row
      System.out.println("READ-DATA = " + returnData.toString());
     }
    }

    System.out.println("Checking for next Iteration outer");

    counter++;
   }

  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 /*
  * Reading Row Wise Data
  */
 
 public void readRowWiseRCFileData() {
  try {
   RCFile.Reader rcFileReader = new RCFile.Reader(fs, new Path("sumit/rctext"), conf);
   int counter = 1;
   // Getting the Chunk of Row Groups...
   while (rcFileReader.next(new LongWritable(counter))) {
    System.out.println("READ ROW WISE - we are getting some data for ROW = " + counter);
    BytesRefArrayWritable dataRead = new BytesRefArrayWritable();
    rcFileReader.getCurrentRow(dataRead);

    // Iterate over all Rows fetched and iterate over each column
    System.out.println("Size of Data Read - " + dataRead.size());
    for (int i = 0; i < dataRead.size(); i++) {
     BytesRefWritable bytesRefread = dataRead.get(i);
     byte b1[] = bytesRefread.getData();
     Text returnData = new Text(b1);
     // This will PRINT the Data for the existing Row
     System.out.println("READ-DATA = " + returnData.toString());
    }
    System.out.println("Checking for next Iteration");

    counter++;
   }

  } catch (Exception e) {
   e.printStackTrace();
  }
 }
 
 /**
  * Main Method
  * @param args
  */

 public static void main(String[] args) {
  try {

   TestRCSimpleReadWrite obj = new TestRCSimpleReadWrite();
   System.out.println("Start writing the Data");
   obj.writeRCData();
   System.out.println("Start reading Column Wise Data");
   obj.readColumnWiseRCFileData();
   System.out.println("Start reading Row Wise Data");
   obj.readRowWiseRCFileData();

  } catch (Exception e) {
   e.printStackTrace();
  }

 }

}

References: -
1. http://en.wikipedia.org/wiki/RCFile
2. http://www.cse.ohio-state.edu/hpcs/WWW/HTML/publications/papers/TR-11-4.pdf

2 comments:

benslin kard said...

Cassandra being more suitable for real time transaction processing and the serving of interactive data.

Sumit said...

Hi Benslin,
I am not sure that you will choose Cassandra for "transactions".

If by "real time" you mean performance? than yes you are right to an extent but it also depends upon the use cases...like for use case where you need performance + Strongly Consistent model than i do not think Cassandra would be a right fit.