let hivesterix use hyracks-hdfs

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_release_cleanup@3083 123451ca-8445-de46-9d55-352943316053
diff --git a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
index d41bdc8..666d361 100644
--- a/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
+++ b/hivesterix/hivesterix-common/src/main/java/edu/uci/ics/hivesterix/common/config/ConfUtil.java
@@ -27,6 +27,7 @@
 

 @SuppressWarnings({ "rawtypes", "deprecation" })

 public class ConfUtil {

+    private static final String clusterPropertiesPath = "conf/cluster.properties";

 

     private static JobConf job;

     private static HiveConf hconf;

@@ -34,8 +35,8 @@
     private static Map<String, List<String>> ncMapping;

     private static IHyracksClientConnection hcc = null;

     private static ClusterTopology topology = null;

-    private static final String clusterPropertiesPath = "conf/cluster.properties";

     private static Properties clusterProps;

+    private static Map<String, NodeControllerInfo> ncNameToNcInfos;

 

     public static JobConf getJobConf(Class<? extends InputFormat> format, Path path) {

         JobConf conf = new JobConf();

@@ -109,6 +110,17 @@
         return ncMapping;

     }

 

+    public static Map<String, NodeControllerInfo> getNodeControllerInfo() throws HyracksException {

+        if (ncNameToNcInfos == null) {

+            try {

+                loadClusterConfig();

+            } catch (Exception e) {

+                throw new HyracksException(e);

+            }

+        }

+        return ncNameToNcInfos;

+    }

+

     private static void loadClusterConfig() {

         try {

             getHiveConf();

@@ -132,7 +144,7 @@
 

             hcc = new HyracksConnection(ipAddress, port);

             topology = hcc.getClusterTopology();

-            Map<String, NodeControllerInfo> ncNameToNcInfos = hcc.getNodeControllerInfos();

+            ncNameToNcInfos = hcc.getNodeControllerInfos();

             NCs = new String[ncNameToNcInfos.size() * mpl];

             ncMapping = new HashMap<String, List<String>>();

             int i = 0;

diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
index f3af915..0d2c78a 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveScanRuntimeGenerator.java
@@ -1,16 +1,17 @@
 package edu.uci.ics.hivesterix.runtime.jobgen;

 

 import java.util.List;

+import java.util.Map;

 import java.util.Properties;

 

 import org.apache.hadoop.fs.Path;

 import org.apache.hadoop.hive.ql.plan.PartitionDesc;

+import org.apache.hadoop.mapred.InputSplit;

 import org.apache.hadoop.mapred.JobConf;

 

 import edu.uci.ics.hivesterix.common.config.ConfUtil;

-import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveFileScanOperatorDescriptor;

-import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveFileSplitProvider;

-import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveTupleParserFactory;

+import edu.uci.ics.hivesterix.runtime.operator.filescan.HiveKeyValueParserFactory;

+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;

 import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;

 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

 import edu.uci.ics.hyracks.algebricks.common.utils.Pair;

@@ -19,12 +20,13 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;

 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;

 import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;

+import edu.uci.ics.hyracks.api.client.NodeControllerInfo;

 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;

 import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;

 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;

 import edu.uci.ics.hyracks.api.job.JobSpecification;

-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;

-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;

+import edu.uci.ics.hyracks.hdfs.dataflow.HDFSReadOperatorDescriptor;

+import edu.uci.ics.hyracks.hdfs.scheduler.Scheduler;

 

 @SuppressWarnings({ "rawtypes", "deprecation" })

 public class HiveScanRuntimeGenerator {

@@ -80,15 +82,18 @@
             // get record descriptor

             RecordDescriptor recDescriptor = mkRecordDescriptor(propagatedSchema, schemaTypes, context);

 

-            // setup the run time operator

+            // setup the run time operator and constraints

             JobConf conf = ConfUtil.getJobConf(fileDesc.getInputFileFormatClass(), filePath);

-            int clusterSize = ConfUtil.getNCs().length;

-            IFileSplitProvider fsprovider = new HiveFileSplitProvider(conf, filePathName, clusterSize);

-            ITupleParserFactory tupleParserFactory = new HiveTupleParserFactory(fileDesc, conf, outputColumnsOffset);

-            HiveFileScanOperatorDescriptor opDesc = new HiveFileScanOperatorDescriptor(jobSpec, fsprovider,

-                    tupleParserFactory, recDescriptor);

+            String[] locConstraints = ConfUtil.getNCs();

+            Map<String, NodeControllerInfo> ncNameToNcInfos = ConfUtil.getNodeControllerInfo();

+            Scheduler scheduler = new Scheduler(ncNameToNcInfos);

+            InputSplit[] splits = conf.getInputFormat().getSplits(conf, locConstraints.length);

+            String[] schedule = scheduler.getLocationConstraints(splits);

+            IOperatorDescriptor scanner = new HDFSReadOperatorDescriptor(jobSpec, recDescriptor, conf, splits,

+                    schedule, new HiveKeyValueParserFactory(fileDesc, conf, outputColumnsOffset));

 

-            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(opDesc, opDesc.getPartitionConstraint());

+            return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner,

+                    new AlgebricksAbsolutePartitionConstraint(locConstraints));

         } catch (Exception e) {

             throw new AlgebricksException(e);

         }

diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveFileSplitProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveFileSplitProvider.java
deleted file mode 100644
index 03f3312..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveFileSplitProvider.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;

-

-import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;

-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;

-

-public abstract class AbstractHiveFileSplitProvider implements IFileSplitProvider {

-    private static final long serialVersionUID = 1L;

-

-    @Override

-    public FileSplit[] getFileSplits() {

-        // TODO Auto-generated method stub

-        return null;

-    }

-

-    @SuppressWarnings("deprecation")

-    public abstract org.apache.hadoop.mapred.FileSplit[] getFileSplitArray();

-

-}

diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveTupleParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveTupleParser.java
deleted file mode 100644
index 485e1d0..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/AbstractHiveTupleParser.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;

-

-import java.io.InputStream;

-

-import org.apache.hadoop.mapred.FileSplit;

-

-import edu.uci.ics.hyracks.api.comm.IFrameWriter;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;

-

-@SuppressWarnings("deprecation")

-public abstract class AbstractHiveTupleParser implements ITupleParser {

-

-    @Override

-    public void parse(InputStream in, IFrameWriter writer) throws HyracksDataException {

-        // empty implementation

-    }

-

-    /**

-     * method for parsing HDFS file split

-     * 

-     * @param split

-     * @param writer

-     */

-    abstract public void parse(FileSplit split, IFrameWriter writer) throws HyracksDataException;

-

-}

diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java
deleted file mode 100644
index a64d398..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileScanOperatorDescriptor.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hivesterix.runtime.operator.filescan;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.hadoop.mapred.FileSplit;
-
-import edu.uci.ics.hivesterix.common.config.ConfUtil;
-import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;
-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
-
-@SuppressWarnings("deprecation")
-public class HiveFileScanOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * tuple parser factory
-     */
-    private final ITupleParserFactory tupleParserFactory;
-
-    /**
-     * Hive file split
-     */
-    private Partition[] parts;
-
-    /**
-     * IFileSplitProvider
-     */
-    private IFileSplitProvider fileSplitProvider;
-
-    /**
-     * constrains in the form of host DNS names
-     */
-    private String[] constraintsByHostNames;
-
-    /**
-     * ip-to-node controller mapping
-     */
-    private Map<String, List<String>> ncMapping;
-
-    /**
-     * an array of NCs
-     */
-    private String[] NCs;
-
-    /**
-     * @param spec
-     * @param fsProvider
-     */
-    public HiveFileScanOperatorDescriptor(JobSpecification spec, IFileSplitProvider fsProvider,
-            ITupleParserFactory tupleParserFactory, RecordDescriptor rDesc) {
-        super(spec, 0, 1);
-        this.tupleParserFactory = tupleParserFactory;
-        recordDescriptors[0] = rDesc;
-        fileSplitProvider = fsProvider;
-    }
-
-    /**
-     * set partition constraint at the first time it is called the number of
-     * partitions is obtained from HDFS name node
-     */
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws AlgebricksException {
-        try {
-            FileSplit[] returnedSplits = ((AbstractHiveFileSplitProvider) fileSplitProvider).getFileSplitArray();
-            Random random = new Random(System.currentTimeMillis());
-            ncMapping = ConfUtil.getNCMapping();
-            NCs = ConfUtil.getNCs();
-
-            int size = 0;
-            for (FileSplit split : returnedSplits)
-                if (split != null)
-                    size++;
-
-            FileSplit[] splits = new FileSplit[size];
-            for (int i = 0; i < returnedSplits.length; i++)
-                if (returnedSplits[i] != null)
-                    splits[i] = returnedSplits[i];
-
-            System.out.println("number of splits: " + splits.length);
-            constraintsByHostNames = new String[splits.length];
-            for (int i = 0; i < splits.length; i++) {
-                try {
-                    String[] loc = splits[i].getLocations();
-                    Collections.shuffle(Arrays.asList(loc), random);
-                    if (loc.length > 0) {
-                        InetAddress[] allIps = InetAddress.getAllByName(loc[0]);
-                        for (InetAddress ip : allIps) {
-                            if (ncMapping.get(ip.getHostAddress()) != null) {
-                                List<String> ncs = ncMapping.get(ip.getHostAddress());
-                                int pos = random.nextInt(ncs.size());
-                                constraintsByHostNames[i] = ncs.get(pos);
-                            } else {
-                                int pos = random.nextInt(NCs.length);
-                                constraintsByHostNames[i] = NCs[pos];
-                            }
-                        }
-                    } else {
-                        int pos = random.nextInt(NCs.length);
-                        constraintsByHostNames[i] = NCs[pos];
-                        if (splits[i].getLength() > 0)
-                            throw new IllegalStateException("non local scanner non locations!!");
-                    }
-                } catch (IOException e) {
-                    throw new AlgebricksException(e);
-                }
-            }
-
-            parts = new Partition[splits.length];
-            for (int i = 0; i < splits.length; i++) {
-                parts[i] = new Partition(splits[i]);
-            }
-            return new AlgebricksAbsolutePartitionConstraint(constraintsByHostNames);
-        } catch (Exception e) {
-            throw new AlgebricksException(e);
-        }
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
-            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
-
-        final ITupleParser tp = tupleParserFactory.createTupleParser(ctx);
-        final int partitionId = partition;
-
-        return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
-            @Override
-            public void initialize() throws HyracksDataException {
-                writer.open();
-                FileSplit split = parts[partitionId].toFileSplit();
-                if (split == null)
-                    throw new HyracksDataException("partition " + partitionId + " is null!");
-                ((AbstractHiveTupleParser) tp).parse(split, writer);
-                writer.close();
-            }
-        };
-    }
-}
\ No newline at end of file
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileSplitProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileSplitProvider.java
deleted file mode 100644
index af52f27..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveFileSplitProvider.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;

-

-import java.io.DataInputStream;

-import java.io.DataOutputStream;

-import java.io.File;

-import java.io.FileInputStream;

-import java.io.FileOutputStream;

-import java.io.IOException;

-import java.io.OutputStreamWriter;

-import java.io.PrintWriter;

-import java.util.UUID;

-

-import org.apache.hadoop.mapred.FileSplit;

-import org.apache.hadoop.mapred.InputFormat;

-import org.apache.hadoop.mapred.JobConf;

-import org.eclipse.jetty.util.log.Log;

-

-@SuppressWarnings({ "deprecation", "rawtypes" })

-public class HiveFileSplitProvider extends AbstractHiveFileSplitProvider {

-    private static final long serialVersionUID = 1L;

-

-    private transient InputFormat format;

-    private transient JobConf conf;

-    private String confContent;

-    final private int nPartition;

-    private transient FileSplit[] splits;

-

-    public HiveFileSplitProvider(JobConf conf, String filePath, int nPartition) {

-        format = conf.getInputFormat();

-        this.conf = conf;

-        this.nPartition = nPartition;

-        writeConfContent();

-    }

-

-    private void writeConfContent() {

-        File dir = new File("hadoop-conf-tmp");

-        if (!dir.exists()) {

-            dir.mkdir();

-        }

-

-        String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";

-        try {

-            DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(fileName)));

-            conf.writeXml(out);

-            out.close();

-

-            DataInputStream in = new DataInputStream(new FileInputStream(fileName));

-            StringBuffer buffer = new StringBuffer();

-            String line;

-            while ((line = in.readLine()) != null) {

-                buffer.append(line + "\n");

-            }

-            in.close();

-            confContent = buffer.toString();

-        } catch (Exception e) {

-            e.printStackTrace();

-        }

-    }

-

-    private void readConfContent() {

-        File dir = new File("hadoop-conf-tmp");

-        if (!dir.exists()) {

-            dir.mkdir();

-        }

-

-        String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";

-        try {

-            PrintWriter out = new PrintWriter((new OutputStreamWriter(new FileOutputStream(new File(fileName)))));

-            out.write(confContent);

-            out.close();

-            conf = new JobConf(fileName);

-        } catch (Exception e) {

-            e.printStackTrace();

-        }

-    }

-

-    @Override

-    /**

-     * get the HDFS file split

-     */

-    public FileSplit[] getFileSplitArray() {

-        readConfContent();

-        conf.setClassLoader(this.getClass().getClassLoader());

-        format = conf.getInputFormat();

-        // int splitSize = conf.getInt("mapred.min.split.size", 0);

-

-        if (splits == null) {

-            try {

-                splits = (org.apache.hadoop.mapred.FileSplit[]) format.getSplits(conf, nPartition);

-                System.out.println("hdfs split number: " + splits.length);

-            } catch (IOException e) {

-                String inputPath = conf.get("mapred.input.dir");

-                String hdfsURL = conf.get("fs.default.name");

-                String alternatePath = inputPath.replaceAll(hdfsURL, "file:");

-                conf.set("mapred.input.dir", alternatePath);

-                try {

-                    splits = (org.apache.hadoop.mapred.FileSplit[]) format.getSplits(conf, nPartition);

-                    System.out.println("hdfs split number: " + splits.length);

-                } catch (IOException e1) {

-                    e1.printStackTrace();

-                    Log.debug(e1.getMessage());

-                    return null;

-                }

-            }

-        }

-        return splits;

-    }

-}

diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
new file mode 100644
index 0000000..472994a
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParser.java
@@ -0,0 +1,209 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import edu.uci.ics.hivesterix.serde.parser.IHiveParser;
+import edu.uci.ics.hivesterix.serde.parser.TextToBinaryTupleParser;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+
+@SuppressWarnings("deprecation")
+public class HiveKeyValueParser<K, V> implements IKeyValueParser<K, V> {
+    /**
+     * the columns to output: projection is pushed into this scan
+     */
+    private int[] outputColumnsOffset;
+
+    /**
+     * serialization/de-serialization object
+     */
+    private SerDe serDe;
+
+    /**
+     * the input row object inspector
+     */
+    private StructObjectInspector structInspector;
+
+    /**
+     * the hadoop job conf
+     */
+    private JobConf job;
+
+    /**
+     * Hyrax context to control resource allocation
+     */
+    private final IHyracksTaskContext ctx;
+
+    /**
+     * lazy serde: format flow in between operators
+     */
+    private final SerDe outputSerDe;
+
+    /**
+     * the parser from hive data to binary data
+     */
+    private IHiveParser parser;
+
+    /**
+     * the buffer for buffering output data
+     */
+    private ByteBuffer buffer;
+
+    /**
+     * the frame tuple appender
+     */
+    private FrameTupleAppender appender;
+
+    /**
+     * the array tuple builder
+     */
+    private ArrayTupleBuilder tb;
+
+    /**
+     * the field references of all fields
+     */
+    private List<? extends StructField> fieldRefs;
+
+    /**
+     * output fields
+     */
+    private Object[] outputFields;
+
+    /**
+     * output field references
+     */
+    private StructField[] outputFieldRefs;
+
+    public HiveKeyValueParser(String serDeClass, String outputSerDeClass, Properties tbl, JobConf conf,
+            final IHyracksTaskContext ctx, int[] outputColumnsOffset) throws HyracksDataException {
+        try {
+            job = conf;
+            // initialize the input serde
+            serDe = (SerDe) ReflectionUtils.newInstance(Class.forName(serDeClass), job);
+            serDe.initialize(job, tbl);
+            // initialize the output serde
+            outputSerDe = (SerDe) ReflectionUtils.newInstance(Class.forName(outputSerDeClass), job);
+            outputSerDe.initialize(job, tbl);
+            // object inspector of the row
+            structInspector = (StructObjectInspector) serDe.getObjectInspector();
+            // hyracks context
+            this.ctx = ctx;
+            this.outputColumnsOffset = outputColumnsOffset;
+
+            if (structInspector instanceof LazySimpleStructObjectInspector) {
+                LazySimpleStructObjectInspector rowInspector = (LazySimpleStructObjectInspector) structInspector;
+                List<? extends StructField> fieldRefs = rowInspector.getAllStructFieldRefs();
+                boolean lightWeightParsable = true;
+                for (StructField fieldRef : fieldRefs) {
+                    Category category = fieldRef.getFieldObjectInspector().getCategory();
+                    if (!(category == Category.PRIMITIVE)) {
+                        lightWeightParsable = false;
+                        break;
+                    }
+                }
+                if (lightWeightParsable) {
+                    parser = new TextToBinaryTupleParser(this.outputColumnsOffset, structInspector);
+                }
+            }
+
+            fieldRefs = structInspector.getAllStructFieldRefs();
+            int size = 0;
+            for (int i = 0; i < outputColumnsOffset.length; i++) {
+                if (outputColumnsOffset[i] >= 0) {
+                    size++;
+                }
+            }
+
+            tb = new ArrayTupleBuilder(size);
+            outputFieldRefs = new StructField[size];
+            outputFields = new Object[size];
+            for (int i = 0; i < outputColumnsOffset.length; i++)
+                if (outputColumnsOffset[i] >= 0)
+                    outputFieldRefs[outputColumnsOffset[i]] = fieldRefs.get(i);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void open(IFrameWriter writer) throws HyracksDataException {
+        buffer = ctx.allocateFrame();
+        appender = new FrameTupleAppender(ctx.getFrameSize());
+        appender.reset(buffer, true);
+    }
+
+    @Override
+    public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException {
+        try {
+            tb.reset();
+            if (parser != null) {
+                Text text = (Text) value;
+                parser.parse(text.getBytes(), 0, text.getLength(), tb);
+            } else {
+                Object row = serDe.deserialize((Writable) value);
+                /**
+                 * write fields to the tuple builder one by one
+                 */
+                int i = 0;
+                for (StructField fieldRef : fieldRefs) {
+                    if (outputColumnsOffset[i] >= 0)
+                        outputFields[outputColumnsOffset[i]] = structInspector.getStructFieldData(row, fieldRef);
+                    i++;
+                }
+                i = 0;
+                DataOutput dos = tb.getDataOutput();
+                for (Object field : outputFields) {
+                    BytesWritable fieldWritable = (BytesWritable) outputSerDe.serialize(field,
+                            outputFieldRefs[i].getFieldObjectInspector());
+                    dos.write(fieldWritable.getBytes(), 0, fieldWritable.getSize());
+                    tb.addFieldEndOffset();
+                    i++;
+                }
+            }
+
+            /**
+             * append the tuple and flush it if necessary.
+             */
+            if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                FrameUtils.flushFrame(buffer, writer);
+                appender.reset(buffer, true);
+                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
+                    throw new IllegalStateException();
+                }
+            }
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+    }
+
+    @Override
+    public void close(IFrameWriter writer) throws HyracksDataException {
+        /**
+         * flush the residual tuples
+         */
+        if (appender.getTupleCount() > 0) {
+            FrameUtils.flushFrame(buffer, writer);
+        }
+        System.gc();
+    }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParserFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParserFactory.java
new file mode 100644
index 0000000..05903b9
--- /dev/null
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveKeyValueParserFactory.java
@@ -0,0 +1,39 @@
+package edu.uci.ics.hivesterix.runtime.operator.filescan;
+
+import java.util.Properties;
+
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.mapred.JobConf;
+
+import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParser;
+import edu.uci.ics.hyracks.hdfs.api.IKeyValueParserFactory;
+import edu.uci.ics.hyracks.hdfs.dataflow.ConfFactory;
+
+@SuppressWarnings("deprecation")
+public class HiveKeyValueParserFactory<K, V> implements IKeyValueParserFactory<K, V> {
+    private static final long serialVersionUID = 1L;
+    private final String serDeClass;
+    private final String outputSerDeClass = LazySerDe.class.getName();;
+    private final Properties tbl;
+    private final ConfFactory confFactory;
+    private final int[] outputColumnsOffset;
+
+    public HiveKeyValueParserFactory(PartitionDesc desc, JobConf conf, int[] outputColumnsOffset)
+            throws HyracksDataException {
+        this.tbl = desc.getProperties();
+        this.serDeClass = (String) tbl.getProperty("serialization.lib");
+        this.outputColumnsOffset = outputColumnsOffset;
+        this.confFactory = new ConfFactory(conf);
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Override
+    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException {
+        return new HiveKeyValueParser(serDeClass, outputSerDeClass, tbl, confFactory.getConf(), ctx,
+                outputColumnsOffset);
+    }
+
+}
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
deleted file mode 100644
index 718c311..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParser.java
+++ /dev/null
@@ -1,215 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;

-

-import java.io.DataOutput;

-import java.io.IOException;

-import java.nio.ByteBuffer;

-import java.util.List;

-import java.util.Properties;

-

-import org.apache.hadoop.hive.serde2.SerDe;

-import org.apache.hadoop.hive.serde2.SerDeException;

-import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;

-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;

-import org.apache.hadoop.hive.serde2.objectinspector.StructField;

-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

-import org.apache.hadoop.io.BytesWritable;

-import org.apache.hadoop.io.Text;

-import org.apache.hadoop.io.Writable;

-import org.apache.hadoop.mapred.FileSplit;

-import org.apache.hadoop.mapred.InputFormat;

-import org.apache.hadoop.mapred.JobConf;

-import org.apache.hadoop.mapred.RecordReader;

-import org.apache.hadoop.mapred.Reporter;

-import org.apache.hadoop.util.ReflectionUtils;

-

-import edu.uci.ics.hivesterix.serde.parser.IHiveParser;

-import edu.uci.ics.hivesterix.serde.parser.TextToBinaryTupleParser;

-import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;

-import edu.uci.ics.hyracks.api.comm.IFrameWriter;

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;

-import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;

-import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;

-

-@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })

-public class HiveTupleParser extends AbstractHiveTupleParser {

-

-    private int[] outputColumnsOffset;

-    /**

-     * class of input format

-     */

-    private InputFormat inputFormat;

-

-    /**

-     * serialization/deserialization object

-     */

-    private SerDe serDe;

-

-    /**

-     * the input row object inspector

-     */

-    private ObjectInspector objectInspector;

-

-    /**

-     * the hadoop job conf

-     */

-    private JobConf job;

-

-    /**

-     * Hyrax context to control resource allocation

-     */

-    private final IHyracksTaskContext ctx;

-

-    /**

-     * lazy serde: format flow in between operators

-     */

-    private final SerDe outputSerDe;

-

-    /**

-     * the parser from hive data to binary data

-     */

-    private IHiveParser parser = null;

-

-    /**

-     * parser for any hive input format

-     * 

-     * @param inputFormatClass

-     * @param serDeClass

-     * @param tbl

-     * @param conf

-     * @throws AlgebricksException

-     */

-    public HiveTupleParser(String inputFormatClass, String serDeClass, String outputSerDeClass, Properties tbl,

-            JobConf conf, final IHyracksTaskContext ctx, int[] outputColumnsOffset) throws AlgebricksException {

-        try {

-            conf.setClassLoader(this.getClass().getClassLoader());

-

-            inputFormat = (InputFormat) ReflectionUtils.newInstance(Class.forName(inputFormatClass), conf);

-            job = conf;

-

-            // initialize the input serde

-            serDe = (SerDe) ReflectionUtils.newInstance(Class.forName(serDeClass), job);

-            serDe.initialize(job, tbl);

-

-            // initialize the output serde

-            outputSerDe = (SerDe) ReflectionUtils.newInstance(Class.forName(outputSerDeClass), job);

-            outputSerDe.initialize(job, tbl);

-

-            // object inspector of the row

-            objectInspector = serDe.getObjectInspector();

-

-            // hyracks context

-            this.ctx = ctx;

-            this.outputColumnsOffset = outputColumnsOffset;

-

-            if (objectInspector instanceof LazySimpleStructObjectInspector) {

-                LazySimpleStructObjectInspector rowInspector = (LazySimpleStructObjectInspector) objectInspector;

-                List<? extends StructField> fieldRefs = rowInspector.getAllStructFieldRefs();

-                boolean lightWeightParsable = true;

-                for (StructField fieldRef : fieldRefs) {

-                    Category category = fieldRef.getFieldObjectInspector().getCategory();

-                    if (!(category == Category.PRIMITIVE)) {

-                        lightWeightParsable = false;

-                        break;

-                    }

-                }

-                if (lightWeightParsable)

-                    parser = new TextToBinaryTupleParser(this.outputColumnsOffset, this.objectInspector);

-            }

-        } catch (Exception e) {

-            throw new AlgebricksException(e);

-        }

-    }

-

-    /**

-     * parse a input HDFS file split, the result is send to the writer

-     * one-frame-a-time

-     * 

-     * @param split

-     *            the HDFS file split

-     * @param writer

-     *            the writer

-     * @throws HyracksDataException

-     *             if there is sth. wrong in the ser/de

-     */

-    @Override

-    public void parse(FileSplit split, IFrameWriter writer) throws HyracksDataException {

-        try {

-            StructObjectInspector structInspector = (StructObjectInspector) objectInspector;

-

-            // create the reader, key, and value

-            RecordReader reader = inputFormat.getRecordReader(split, job, Reporter.NULL);

-            Object key = reader.createKey();

-            Object value = reader.createValue();

-

-            // allocate a new frame

-            ByteBuffer frame = ctx.allocateFrame();

-            FrameTupleAppender appender = new FrameTupleAppender(ctx.getFrameSize());

-            appender.reset(frame, true);

-

-            List<? extends StructField> fieldRefs = structInspector.getAllStructFieldRefs();

-            int size = 0;

-            for (int i = 0; i < outputColumnsOffset.length; i++)

-                if (outputColumnsOffset[i] >= 0)

-                    size++;

-

-            ArrayTupleBuilder tb = new ArrayTupleBuilder(size);

-            DataOutput dos = tb.getDataOutput();

-            StructField[] outputFieldRefs = new StructField[size];

-            Object[] outputFields = new Object[size];

-            for (int i = 0; i < outputColumnsOffset.length; i++)

-                if (outputColumnsOffset[i] >= 0)

-                    outputFieldRefs[outputColumnsOffset[i]] = fieldRefs.get(i);

-

-            while (reader.next(key, value)) {

-                // reuse the tuple builder

-                tb.reset();

-                if (parser != null) {

-                    Text text = (Text) value;

-                    parser.parse(text.getBytes(), 0, text.getLength(), tb);

-                } else {

-                    Object row = serDe.deserialize((Writable) value);

-                    // write fields to the tuple builder one by one

-                    int i = 0;

-                    for (StructField fieldRef : fieldRefs) {

-                        if (outputColumnsOffset[i] >= 0)

-                            outputFields[outputColumnsOffset[i]] = structInspector.getStructFieldData(row, fieldRef);

-                        i++;

-                    }

-

-                    i = 0;

-                    for (Object field : outputFields) {

-                        BytesWritable fieldWritable = (BytesWritable) outputSerDe.serialize(field,

-                                outputFieldRefs[i].getFieldObjectInspector());

-                        dos.write(fieldWritable.getBytes(), 0, fieldWritable.getSize());

-                        tb.addFieldEndOffset();

-                        i++;

-                    }

-                }

-

-                if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {

-                    if (appender.getTupleCount() <= 0)

-                        throw new IllegalStateException("zero tuples in a frame!");

-                    FrameUtils.flushFrame(frame, writer);

-                    appender.reset(frame, true);

-                    if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {

-                        throw new IllegalStateException();

-                    }

-                }

-            }

-            reader.close();

-            System.gc();

-

-            // flush the last frame

-            if (appender.getTupleCount() > 0) {

-                FrameUtils.flushFrame(frame, writer);

-            }

-        } catch (IOException e) {

-            throw new HyracksDataException(e);

-        } catch (SerDeException e) {

-            throw new HyracksDataException(e);

-        }

-    }

-}

diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParserFactory.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParserFactory.java
deleted file mode 100644
index 98d730f..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/HiveTupleParserFactory.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;

-

-import java.io.DataInputStream;

-import java.io.DataOutputStream;

-import java.io.File;

-import java.io.FileInputStream;

-import java.io.FileOutputStream;

-import java.io.OutputStreamWriter;

-import java.io.PrintWriter;

-import java.util.Properties;

-import java.util.UUID;

-

-import org.apache.hadoop.hive.ql.plan.PartitionDesc;

-import org.apache.hadoop.mapred.JobConf;

-

-import edu.uci.ics.hivesterix.serde.lazy.LazySerDe;

-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;

-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParser;

-import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;

-

-@SuppressWarnings("deprecation")

-public class HiveTupleParserFactory implements ITupleParserFactory {

-

-    private static final long serialVersionUID = 1L;

-

-    private int[] outputColumns;

-

-    private String outputSerDeClass = LazySerDe.class.getName();

-

-    private String inputSerDeClass;

-

-    private transient JobConf conf;

-

-    private Properties tbl;

-

-    private String confContent;

-

-    private String inputFormatClass;

-

-    public HiveTupleParserFactory(PartitionDesc desc, JobConf conf, int[] outputColumns) {

-        this.conf = conf;

-        tbl = desc.getProperties();

-        inputFormatClass = (String) tbl.getProperty("file.inputformat");

-        inputSerDeClass = (String) tbl.getProperty("serialization.lib");

-        this.outputColumns = outputColumns;

-

-        writeConfContent();

-    }

-

-    @Override

-    public ITupleParser createTupleParser(IHyracksTaskContext ctx) {

-        readConfContent();

-        try {

-            return new HiveTupleParser(inputFormatClass, inputSerDeClass, outputSerDeClass, tbl, conf, ctx,

-                    outputColumns);

-        } catch (Exception e) {

-            e.printStackTrace();

-            return null;

-        }

-    }

-

-    private void writeConfContent() {

-        File dir = new File("hadoop-conf-tmp");

-        if (!dir.exists()) {

-            dir.mkdir();

-        }

-

-        String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";

-        try {

-            DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(fileName)));

-            conf.writeXml(out);

-            out.close();

-

-            DataInputStream in = new DataInputStream(new FileInputStream(fileName));

-            StringBuffer buffer = new StringBuffer();

-            String line;

-            while ((line = in.readLine()) != null) {

-                buffer.append(line + "\n");

-            }

-            in.close();

-            confContent = buffer.toString();

-        } catch (Exception e) {

-            e.printStackTrace();

-        }

-    }

-

-    private void readConfContent() {

-        File dir = new File("hadoop-conf-tmp");

-        if (!dir.exists()) {

-            dir.mkdir();

-        }

-

-        String fileName = "hadoop-conf-tmp/" + UUID.randomUUID() + System.currentTimeMillis() + ".xml";

-        try {

-            PrintWriter out = new PrintWriter((new OutputStreamWriter(new FileOutputStream(new File(fileName)))));

-            out.write(confContent);

-            out.close();

-

-            conf = new JobConf(fileName);

-        } catch (Exception e) {

-            e.printStackTrace();

-        }

-    }

-

-}

diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/Partition.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/Partition.java
deleted file mode 100644
index 6742a34..0000000
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/operator/filescan/Partition.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package edu.uci.ics.hivesterix.runtime.operator.filescan;

-

-import java.io.IOException;

-import java.io.Serializable;

-

-import org.apache.hadoop.fs.Path;

-import org.apache.hadoop.mapred.FileSplit;

-

-@SuppressWarnings("deprecation")

-public class Partition implements Serializable {

-    private static final long serialVersionUID = 1L;

-

-    private String uri;

-    private long offset;

-    private long length;

-    private String[] locations;

-

-    public Partition() {

-    }

-

-    public Partition(FileSplit file) {

-        uri = file.getPath().toUri().toString();

-        offset = file.getStart();

-        length = file.getLength();

-        try {

-            locations = file.getLocations();

-        } catch (IOException e) {

-            throw new IllegalStateException(e);

-        }

-    }

-

-    public FileSplit toFileSplit() {

-        return new FileSplit(new Path(uri), offset, length, locations);

-    }

-}

diff --git a/hivesterix/hivesterix-runtime/src/test/java/edu/uci/ics/hyracks/AppTest.java b/hivesterix/hivesterix-runtime/src/test/java/edu/uci/ics/hyracks/AppTest.java
deleted file mode 100644
index 0c701c8..0000000
--- a/hivesterix/hivesterix-runtime/src/test/java/edu/uci/ics/hyracks/AppTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package edu.uci.ics.hyracks;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest 
-    extends TestCase
-{
-    /**
-     * Create the test case
-     *
-     * @param testName name of the test case
-     */
-    public AppTest( String testName )
-    {
-        super( testName );
-    }
-
-    /**
-     * @return the suite of tests being tested
-     */
-    public static Test suite()
-    {
-        return new TestSuite( AppTest.class );
-    }
-
-    /**
-     * Rigourous Test :-)
-     */
-    public void testApp()
-    {
-        assertTrue( true );
-    }
-}
diff --git a/hivesterix/hivesterix-serde/src/test/java/edu/uci/ics/hyracks/AppTest.java b/hivesterix/hivesterix-serde/src/test/java/edu/uci/ics/hyracks/AppTest.java
deleted file mode 100644
index 0c701c8..0000000
--- a/hivesterix/hivesterix-serde/src/test/java/edu/uci/ics/hyracks/AppTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package edu.uci.ics.hyracks;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest 
-    extends TestCase
-{
-    /**
-     * Create the test case
-     *
-     * @param testName name of the test case
-     */
-    public AppTest( String testName )
-    {
-        super( testName );
-    }
-
-    /**
-     * @return the suite of tests being tested
-     */
-    public static Test suite()
-    {
-        return new TestSuite( AppTest.class );
-    }
-
-    /**
-     * Rigourous Test :-)
-     */
-    public void testApp()
-    {
-        assertTrue( true );
-    }
-}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
index 5923e1e..5d35ec5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParser.java
@@ -29,12 +29,24 @@
 public interface IKeyValueParser<K, V> {
 
     /**
+     * Initialize the key value parser.
+     * 
+     * @param writer
+     *            The hyracks writer for outputting data.
+     * @throws HyracksDataException
+     */
+    public void open(IFrameWriter writer) throws HyracksDataException;
+
+    /**
      * Parse a key-value pair returned by HDFS record reader to a tuple.
      * when the parsers' internal buffer is full, it can flush the buffer to the writer
      * 
      * @param key
+     *            The key returned from Hadoop's InputReader.
      * @param value
+     *            The value returned from Hadoop's InputReader.
      * @param writer
+     *            The hyracks writer for outputting data.
      * @throws HyracksDataException
      */
     public void parse(K key, V value, IFrameWriter writer) throws HyracksDataException;
@@ -44,7 +56,8 @@
      * This method is called in the close() of HDFSReadOperatorDescriptor.
      * 
      * @param writer
+     *            The hyracks writer for outputting data.
      * @throws HyracksDataException
      */
-    public void flush(IFrameWriter writer) throws HyracksDataException;
+    public void close(IFrameWriter writer) throws HyracksDataException;
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
index 6e943ad..7d6f868 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/IKeyValueParserFactory.java
@@ -18,6 +18,7 @@
 import java.io.Serializable;
 
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 
 /**
  * Users need to implement this interface to use the HDFSReadOperatorDescriptor.
@@ -36,6 +37,6 @@
      *            the IHyracksTaskContext
      * @return a key-value parser instance.
      */
-    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx);
+    public IKeyValueParser<K, V> createKeyValueParser(IHyracksTaskContext ctx) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
index 25b9523..8e85627 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriter.java
@@ -26,6 +26,15 @@
 public interface ITupleWriter {
 
     /**
+     * Initialize the the tuple writer.
+     * 
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void open(DataOutput output) throws HyracksDataException;
+
+    /**
      * Write the tuple to the DataOutput.
      * 
      * @param output
@@ -36,4 +45,13 @@
      */
     public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException;
 
+    /**
+     * Close the writer.
+     * 
+     * @param output
+     *            The channel for output data.
+     * @throws HyracksDataException
+     */
+    public void close(DataOutput output) throws HyracksDataException;
+
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
index 839de8f..9a025c2 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/api/ITupleWriterFactory.java
@@ -17,14 +17,19 @@
 
 import java.io.Serializable;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
 /**
  * Users need to implement this interface to use the HDFSWriteOperatorDescriptor.
  */
 public interface ITupleWriterFactory extends Serializable {
 
     /**
+     * @param ctx
+     *            the IHyracksTaskContext
      * @return a tuple writer instance
      */
-    public ITupleWriter getTupleWriter();
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) throws HyracksDataException;
 
 }
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index e924650..f49688b 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -102,6 +102,7 @@
                     JobConf conf = confFactory.getConf();
                     IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
                     writer.open();
+                    parser.open(writer);
                     InputFormat inputFormat = conf.getInputFormat();
                     for (int i = 0; i < inputSplits.length; i++) {
                         /**
@@ -131,7 +132,7 @@
                             }
                         }
                     }
-                    parser.flush(writer);
+                    parser.close(writer);
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
index ff97a29..36d5f55 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/dataflow/HDFSWriteOperatorDescriptor.java
@@ -89,7 +89,7 @@
                 String outputDirPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputDirPath + File.separator + "part-" + partition;
 
-                tupleWriter = tupleWriterFactory.getTupleWriter();
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
                 try {
                     FileSystem dfs = FileSystem.get(conf);
                     dos = dfs.create(new Path(fileName), true);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
index c691f5d..9574bb4 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextKeyValueParserFactory.java
@@ -43,6 +43,11 @@
         return new IKeyValueParser<LongWritable, Text>() {
 
             @Override
+            public void open(IFrameWriter writer) {
+
+            }
+
+            @Override
             public void parse(LongWritable key, Text value, IFrameWriter writer) throws HyracksDataException {
                 tb.reset();
                 tb.addField(value.getBytes(), 0, value.getLength());
@@ -56,7 +61,7 @@
             }
 
             @Override
-            public void flush(IFrameWriter writer) throws HyracksDataException {
+            public void close(IFrameWriter writer) throws HyracksDataException {
                 FrameUtils.flushFrame(buffer, writer);
             }
 
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
index d26721d..0da14e5 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs/lib/TextTupleWriterFactory.java
@@ -17,6 +17,7 @@
 
 import java.io.DataOutput;
 
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
 import edu.uci.ics.hyracks.hdfs.api.ITupleWriter;
@@ -26,9 +27,14 @@
     private static final long serialVersionUID = 1L;
 
     @Override
-    public ITupleWriter getTupleWriter() {
+    public ITupleWriter getTupleWriter(IHyracksTaskContext ctx) {
         return new ITupleWriter() {
-            byte newLine = "\n".getBytes()[0];
+            private byte newLine = "\n".getBytes()[0];
+
+            @Override
+            public void open(DataOutput output) {
+
+            }
 
             @Override
             public void write(DataOutput output, ITupleReference tuple) throws HyracksDataException {
@@ -43,6 +49,11 @@
                 }
             }
 
+            @Override
+            public void close(DataOutput output) {
+
+            }
+
         };
     }
 
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index eb7b6e1..9e9abdf 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -147,7 +147,7 @@
                             }
                         }
                     }
-                    parser.flush(writer);
+                    parser.close(writer);
                     writer.close();
                 } catch (Exception e) {
                     throw new HyracksDataException(e);
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
index 390a7b5..86ee527 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/edu/uci/ics/hyracks/hdfs2/dataflow/HDFSWriteOperatorDescriptor.java
@@ -88,7 +88,7 @@
                 String outputPath = FileOutputFormat.getOutputPath(conf).toString();
                 String fileName = outputPath + File.separator + "part-" + partition;
 
-                tupleWriter = tupleWriterFactory.getTupleWriter();
+                tupleWriter = tupleWriterFactory.getTupleWriter(ctx);
                 try {
                     FileSystem dfs = FileSystem.get(conf.getConfiguration());
                     dos = dfs.create(new Path(fileName), true);
diff --git a/pregelix/pregelix-example/pom.xml b/pregelix/pregelix-example/pom.xml
index b3f78e1..e643331 100644
--- a/pregelix/pregelix-example/pom.xml
+++ b/pregelix/pregelix-example/pom.xml
@@ -77,6 +77,7 @@
 				</configuration>
 			</plugin>
 			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-clean-plugin</artifactId>
 				<version>2.5</version>
 				<configuration>