add Pregelix codebase

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
new file mode 100644
index 0000000..171495a
--- /dev/null
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -0,0 +1,597 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.join;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.core.util.TestUtils;
+import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
+import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
+import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
+
+public class JoinTest {
+    private final static String ACTUAL_RESULT_DIR = "actual";
+    private final static String EXPECT_RESULT_DIR = "expected";
+    private final static String ACTUAL_RESULT_FILE = ACTUAL_RESULT_DIR + File.separator + "join.txt";
+    private final static String EXPECTED_RESULT_FILE = EXPECT_RESULT_DIR + File.separator + "join.txt";
+    private final static String JOB_NAME = "JOIN_TEST";
+    private static final String HYRACKS_APP_NAME = "giraph";
+    private static final String NC1_ID = "nc1";
+    private static final String NC2_ID = "nc2";
+
+    private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/data.properties";
+
+    private static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+    private IIndexRegistryProvider<IIndex> treeRegistry = TreeIndexRegistryProvider.INSTANCE;
+    private IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+
+    private IBinaryHashFunctionFactory stringHashFactory = new PointableBinaryHashFunctionFactory(
+            UTF8StringPointable.FACTORY);
+    private IBinaryComparatorFactory stringComparatorFactory = new PointableBinaryComparatorFactory(
+            UTF8StringPointable.FACTORY);
+
+    private void cleanupStores() throws IOException {
+        FileUtils.forceMkdir(new File("teststore"));
+        FileUtils.forceMkdir(new File("build"));
+        FileUtils.cleanDirectory(new File("teststore"));
+        FileUtils.cleanDirectory(new File("build"));
+    }
+
+    @Test
+    public void customerOrderCIDJoinMulti() throws Exception {
+        ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+        cleanupStores();
+        PregelixHyracksIntegrationUtil.init();
+        PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+
+        FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
+        FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        runCreate();
+        runBulkLoad();
+        runHashJoin();
+        runIndexJoin();
+        TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
+
+        FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
+        FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+        runLeftOuterHashJoin();
+        runIndexRightOuterJoin();
+        TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
+
+        PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
+        PregelixHyracksIntegrationUtil.deinit();
+    }
+
+    private void runHashJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+                new int[] { 0 }, new IBinaryHashFunctionFactory[] { stringHashFactory },
+                new IBinaryComparatorFactory[] { stringComparatorFactory }, custOrderJoinDesc, 128);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        int[] sortFields = new int[2];
+        sortFields[0] = 1;
+        sortFields[1] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+        comparatorFactories[0] = stringComparatorFactory;
+        comparatorFactories[1] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, custOrderJoinDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(EXPECTED_RESULT_FILE)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+                null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }));
+        spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }));
+        spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, sorter, 0);
+        IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+        spec.connect(joinWriterConn, sorter, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runCreate() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = stringComparatorFactory;
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+        ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+        for (int i = 0; i < typeTraits.length; i++)
+            typeTraits[i] = new TypeTraits(false);
+        TreeIndexCreateOperatorDescriptor writer = new TreeIndexCreateOperatorDescriptor(spec, storageManagerInterface,
+                treeRegistry, fileSplitProvider, typeTraits, comparatorFactories, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runBulkLoad() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        int[] sortFields = new int[1];
+        sortFields[0] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+        comparatorFactories[0] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+        int[] fieldPermutation = new int[custDesc.getFields().length];
+        for (int i = 0; i < fieldPermutation.length; i++)
+            fieldPermutation[i] = i;
+        ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+        for (int i = 0; i < typeTraits.length; i++)
+            typeTraits[i] = new TypeTraits(false);
+        TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
+                storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, comparatorFactories,
+                fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+                NoOpOperationCallbackProvider.INSTANCE);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
+                sorter, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runIndexJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        /** sort operator */
+        int[] sortFields = new int[2];
+        sortFields[0] = 1;
+        sortFields[1] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+        comparatorFactories[0] = stringComparatorFactory;
+        comparatorFactories[1] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        /** index join operator */
+        int[] keyFields = new int[1];
+        keyFields[0] = 1;
+        IBinaryComparatorFactory[] keyComparatorFactories = new IBinaryComparatorFactory[1];
+        keyComparatorFactories[0] = stringComparatorFactory;
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+        ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+        for (int i = 0; i < typeTraits.length; i++)
+            typeTraits[i] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
+                storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+                typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
+                new BTreeDataflowHelperFactory());
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        /** results (already in sorted order) */
+        FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(ACTUAL_RESULT_FILE)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+                null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
+                sorter, 0, join, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
+                join, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runLeftOuterHashJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+        IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[] { JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE };
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+        InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
+                new int[] { 1 }, new IBinaryHashFunctionFactory[] { stringHashFactory },
+                new IBinaryComparatorFactory[] { stringComparatorFactory }, custOrderJoinDesc, true,
+                nullWriterFactories, 128);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        int[] projectFields = new int[] { 8, 9, 10, 11, 12, 13, 14, 15, 16, 0, 1, 2, 3, 4, 5, 6, 7 };
+        ProjectOperatorDescriptor project = new ProjectOperatorDescriptor(spec, custOrderJoinDesc, projectFields);
+
+        int[] sortFields = new int[2];
+        sortFields[0] = 9;
+        sortFields[1] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+        comparatorFactories[0] = stringComparatorFactory;
+        comparatorFactories[1] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, custOrderJoinDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(EXPECTED_RESULT_FILE)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+                null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 1 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }));
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 0 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }));
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, project, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), project, 0, sorter, 0);
+        IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(new int[] { 9 },
+                        new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+        spec.connect(joinWriterConn, sorter, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runIndexRightOuterJoin() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+                new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+        IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+
+        INullWriterFactory[] nullWriterFactories = new INullWriterFactory[] { JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+                JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE };
+
+        RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+                UTF8StringSerializerDeserializer.INSTANCE });
+
+        FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+                        UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+        /** sort operator */
+        int[] sortFields = new int[2];
+        sortFields[0] = 1;
+        sortFields[1] = 0;
+        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+        comparatorFactories[0] = stringComparatorFactory;
+        comparatorFactories[1] = stringComparatorFactory;
+        ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+                comparatorFactories, ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+        /** index join operator */
+        int[] keyFields = new int[1];
+        keyFields[0] = 1;
+        IBinaryComparatorFactory[] keyComparatorFactories = new IBinaryComparatorFactory[1];
+        keyComparatorFactories[0] = stringComparatorFactory;
+        IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+        ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+        for (int i = 0; i < typeTraits.length; i++)
+            typeTraits[i] = new TypeTraits(false);
+        ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+                typeTraits));
+        IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
+                storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+                typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
+                new BTreeDataflowHelperFactory(), true, nullWriterFactories);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+        /** results (already in sorted order) */
+        FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(ACTUAL_RESULT_FILE)));
+        FileSplit[] results = new FileSplit[1];
+        results[0] = resultFile;
+        IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+        FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+                null);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+        PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                keyFields, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
+                        UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories), sorter, 0, join, 0);
+
+        IBinaryComparatorFactory[] mergeComparatorFactories = new IBinaryComparatorFactory[2];
+        mergeComparatorFactories[0] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
+        mergeComparatorFactories[1] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
+        int[] mergeFields = new int[] { 9, 0 };
+        spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                new int[] { 9 }, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
+                        UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories), join, 0, writer, 0);
+
+        spec.addRoot(writer);
+        runTest(spec);
+    }
+
+    private void runTest(JobSpecification spec) throws Exception {
+        PregelixHyracksIntegrationUtil.runJob(spec, HYRACKS_APP_NAME);
+    }
+}
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTestNullWriterFactory.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTestNullWriterFactory.java
new file mode 100644
index 0000000..8f2e546
--- /dev/null
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTestNullWriterFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.join;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+
+public class JoinTestNullWriterFactory implements INullWriterFactory {
+    private static final long serialVersionUID = 1L;
+    public static INullWriterFactory INSTANCE = new JoinTestNullWriterFactory();
+
+    @Override
+    public INullWriter createNullWriter() {
+        return new INullWriter() {
+
+            @Override
+            public void writeNull(DataOutput out) throws HyracksDataException {
+                UTF8StringSerializerDeserializer.INSTANCE.serialize("NULL", out);
+            }
+
+        };
+    }
+
+}
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/util/TestUtils.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/util/TestUtils.java
new file mode 100644
index 0000000..83dd10d
--- /dev/null
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/util/TestUtils.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+
+public class TestUtils {
+
+    public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
+        BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+        BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+        String lineExpected, lineActual;
+        int num = 1;
+        try {
+            while ((lineExpected = readerExpected.readLine()) != null) {
+                lineActual = readerActual.readLine();
+                // Assert.assertEquals(lineExpected, lineActual);
+                if (lineActual == null) {
+                    throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
+                }
+                if (!equalStrings(lineExpected, lineActual)) {
+                    throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+                            + lineActual);
+                }
+                ++num;
+            }
+            lineActual = readerActual.readLine();
+            if (lineActual != null) {
+                throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
+            }
+        } finally {
+            readerExpected.close();
+            readerActual.close();
+        }
+    }
+
+    private static boolean equalStrings(String s1, String s2) {
+        String[] rowsOne = s1.split("\n");
+        String[] rowsTwo = s2.split("\n");
+
+        if (rowsOne.length != rowsTwo.length)
+            return false;
+
+        for (int i = 0; i < rowsOne.length; i++) {
+            String row1 = rowsOne[i];
+            String row2 = rowsTwo[i];
+
+            if (row1.equals(row2))
+                continue;
+
+            String[] fields1 = row1.split(",");
+            String[] fields2 = row2.split(",");
+
+            for (int j = 0; j < fields1.length; j++) {
+                if (fields1[j].equals(fields2[j])) {
+                    continue;
+                } else if (fields1[j].indexOf('.') < 0) {
+                    return false;
+                } else {
+                    fields1[j] = fields1[j].split("=")[1];
+                    fields2[j] = fields2[j].split("=")[1];
+                    Double double1 = Double.parseDouble(fields1[j]);
+                    Double double2 = Double.parseDouble(fields2[j]);
+                    float float1 = (float) double1.doubleValue();
+                    float float2 = (float) double2.doubleValue();
+
+                    if (Math.abs(float1 - float2) == 0)
+                        continue;
+                    else {
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
+    }
+
+}
diff --git a/pregelix-core/src/test/resources/cluster/data.properties b/pregelix-core/src/test/resources/cluster/data.properties
new file mode 100644
index 0000000..daf881e
--- /dev/null
+++ b/pregelix-core/src/test/resources/cluster/data.properties
@@ -0,0 +1 @@
+store=teststore
\ No newline at end of file
diff --git a/pregelix-core/src/test/resources/hadoop/conf/core-site.xml b/pregelix-core/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..47dfac5
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+    <name>fs.default.name</name>
+    <value>hdfs://127.0.0.1:31888</value>
+</property>
+<property>
+    <name>hadoop.tmp.dir</name>
+    <value>/tmp/hadoop</value>
+</property>
+
+
+</configuration>
diff --git a/pregelix-core/src/test/resources/hadoop/conf/hdfs-site.xml b/pregelix-core/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..8d29b1d
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+   <name>dfs.replication</name>
+   <value>1</value>
+</property>
+
+<property>
+	<name>dfs.block.size</name>
+	<value>65536</value>
+</property>
+
+</configuration>
diff --git a/pregelix-core/src/test/resources/hadoop/conf/log4j.properties b/pregelix-core/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..1b9a4d6
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+  <property>
+    <name>mapred.job.tracker</name>
+    <value>localhost:29007</value>
+  </property>
+  <property>
+     <name>mapred.tasktracker.map.tasks.maximum</name>
+     <value>20</value>
+  </property>
+   <property>
+      <name>mapred.tasktracker.reduce.tasks.maximum</name>
+      <value>20</value>
+   </property>
+   <property>
+      <name>mapred.min.split.size</name>
+      <value>65536</value>
+   </property>
+
+</configuration>
diff --git a/pregelix-core/src/test/resources/hyracks-deployment.properties b/pregelix-core/src/test/resources/hyracks-deployment.properties
new file mode 100644
index 0000000..2ae9818
--- /dev/null
+++ b/pregelix-core/src/test/resources/hyracks-deployment.properties
@@ -0,0 +1,2 @@
+#cc.bootstrap.class=edu.uci.ics.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=edu.uci.ics.pregelix.runtime.bootstrap.NCBootstrapImpl
\ No newline at end of file
diff --git a/pregelix-core/src/test/resources/log4j.properties b/pregelix-core/src/test/resources/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/pregelix-core/src/test/resources/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/pregelix-core/src/test/resources/logging.properties b/pregelix-core/src/test/resources/logging.properties
new file mode 100644
index 0000000..f43eb05
--- /dev/null
+++ b/pregelix-core/src/test/resources/logging.properties
@@ -0,0 +1,66 @@
+############################################################
+#  	Default Logging Configuration File
+#
+# You can use a different file by specifying a filename
+# with the java.util.logging.config.file system property.  
+# For example java -Djava.util.logging.config.file=myfile
+############################################################
+
+############################################################
+#  	Global properties
+############################################################
+
+# "handlers" specifies a comma separated list of log Handler 
+# classes.  These handlers will be installed during VM startup.
+# Note that these classes must be on the system classpath.
+# By default we only configure a ConsoleHandler, which will only
+# show messages at the INFO and above levels.
+
+handlers= java.util.logging.ConsoleHandler
+
+# To also add the FileHandler, use the following line instead.
+
+# handlers= java.util.logging.FileHandler, java.util.logging.ConsoleHandler
+
+# Default global logging level.
+# This specifies which kinds of events are logged across
+# all loggers.  For any given facility this global level
+# can be overriden by a facility specific level
+# Note that the ConsoleHandler also has a separate level
+# setting to limit messages printed to the console.
+
+.level= WARNING
+# .level= INFO
+# .level= FINE
+# .level = FINEST
+
+############################################################
+# Handler specific properties.
+# Describes specific configuration info for Handlers.
+############################################################
+
+# default file output is in user's home directory.
+
+# java.util.logging.FileHandler.pattern = %h/java%u.log
+# java.util.logging.FileHandler.limit = 50000
+# java.util.logging.FileHandler.count = 1
+# java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter
+
+# Limit the message that are printed on the console to FINE and above.
+
+java.util.logging.ConsoleHandler.level = WARNING
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
+
+
+############################################################
+# Facility specific properties.
+# Provides extra control for each logger.
+############################################################
+
+# For example, set the com.xyz.foo logger to only log SEVERE
+# messages:
+
+#edu.uci.ics.asterix.level = FINE
+#edu.uci.ics.algebricks.level = FINE
+edu.uci.ics.hyracks.level = INFO
+#edu.uci.ics.hyracks.control.nc.net.level = FINE
\ No newline at end of file