add Pregelix codebase
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_staging@1960 123451ca-8445-de46-9d55-352943316053
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
new file mode 100644
index 0000000..171495a
--- /dev/null
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -0,0 +1,597 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.join;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
+import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
+import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
+import edu.uci.ics.pregelix.core.data.TypeTraits;
+import edu.uci.ics.pregelix.core.jobgen.clusterconfig.ClusterConfig;
+import edu.uci.ics.pregelix.core.util.PregelixHyracksIntegrationUtil;
+import edu.uci.ics.pregelix.core.util.TestUtils;
+import edu.uci.ics.pregelix.dataflow.std.FileWriteOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.IndexNestedLoopJoinOperatorDescriptor;
+import edu.uci.ics.pregelix.dataflow.std.ProjectOperatorDescriptor;
+import edu.uci.ics.pregelix.runtime.bootstrap.StorageManagerInterface;
+import edu.uci.ics.pregelix.runtime.bootstrap.TreeIndexRegistryProvider;
+
+public class JoinTest {
+ private final static String ACTUAL_RESULT_DIR = "actual";
+ private final static String EXPECT_RESULT_DIR = "expected";
+ private final static String ACTUAL_RESULT_FILE = ACTUAL_RESULT_DIR + File.separator + "join.txt";
+ private final static String EXPECTED_RESULT_FILE = EXPECT_RESULT_DIR + File.separator + "join.txt";
+ private final static String JOB_NAME = "JOIN_TEST";
+ private static final String HYRACKS_APP_NAME = "giraph";
+ private static final String NC1_ID = "nc1";
+ private static final String NC2_ID = "nc2";
+
+ private static final String PATH_TO_CLUSTER_STORE = "src/test/resources/cluster/data.properties";
+
+ private static final float DEFAULT_BTREE_FILL_FACTOR = 1.00f;
+ private IIndexRegistryProvider<IIndex> treeRegistry = TreeIndexRegistryProvider.INSTANCE;
+ private IStorageManagerInterface storageManagerInterface = StorageManagerInterface.INSTANCE;
+
+ private IBinaryHashFunctionFactory stringHashFactory = new PointableBinaryHashFunctionFactory(
+ UTF8StringPointable.FACTORY);
+ private IBinaryComparatorFactory stringComparatorFactory = new PointableBinaryComparatorFactory(
+ UTF8StringPointable.FACTORY);
+
+ private void cleanupStores() throws IOException {
+ FileUtils.forceMkdir(new File("teststore"));
+ FileUtils.forceMkdir(new File("build"));
+ FileUtils.cleanDirectory(new File("teststore"));
+ FileUtils.cleanDirectory(new File("build"));
+ }
+
+ @Test
+ public void customerOrderCIDJoinMulti() throws Exception {
+ ClusterConfig.setStorePath(PATH_TO_CLUSTER_STORE);
+ cleanupStores();
+ PregelixHyracksIntegrationUtil.init();
+ PregelixHyracksIntegrationUtil.createApp(HYRACKS_APP_NAME);
+
+ FileUtils.forceMkdir(new File(EXPECT_RESULT_DIR));
+ FileUtils.forceMkdir(new File(ACTUAL_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ runCreate();
+ runBulkLoad();
+ runHashJoin();
+ runIndexJoin();
+ TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
+
+ FileUtils.cleanDirectory(new File(EXPECT_RESULT_DIR));
+ FileUtils.cleanDirectory(new File(ACTUAL_RESULT_DIR));
+ runLeftOuterHashJoin();
+ runIndexRightOuterJoin();
+ TestUtils.compareWithResult(new File(EXPECTED_RESULT_FILE), new File(ACTUAL_RESULT_FILE));
+
+ PregelixHyracksIntegrationUtil.destroyApp(HYRACKS_APP_NAME);
+ PregelixHyracksIntegrationUtil.deinit();
+ }
+
+ private void runHashJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
+ new int[] { 0 }, new IBinaryHashFunctionFactory[] { stringHashFactory },
+ new IBinaryComparatorFactory[] { stringComparatorFactory }, custOrderJoinDesc, 128);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+ int[] sortFields = new int[2];
+ sortFields[0] = 1;
+ sortFields[1] = 0;
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+ comparatorFactories[0] = stringComparatorFactory;
+ comparatorFactories[1] = stringComparatorFactory;
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+ comparatorFactories, custOrderJoinDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+ FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(EXPECTED_RESULT_FILE)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+ IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { stringHashFactory }));
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { stringHashFactory }));
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, sorter, 0);
+ IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+ spec.connect(joinWriterConn, sorter, 0, writer, 0);
+
+ spec.addRoot(writer);
+ runTest(spec);
+ }
+
+ private void runCreate() throws Exception {
+ JobSpecification spec = new JobSpecification();
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = stringComparatorFactory;
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+ ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+ for (int i = 0; i < typeTraits.length; i++)
+ typeTraits[i] = new TypeTraits(false);
+ TreeIndexCreateOperatorDescriptor writer = new TreeIndexCreateOperatorDescriptor(spec, storageManagerInterface,
+ treeRegistry, fileSplitProvider, typeTraits, comparatorFactories, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
+ spec.addRoot(writer);
+ runTest(spec);
+ }
+
+ private void runBulkLoad() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+ int[] sortFields = new int[1];
+ sortFields[0] = 0;
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[1];
+ comparatorFactories[0] = stringComparatorFactory;
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+ comparatorFactories, custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+ int[] fieldPermutation = new int[custDesc.getFields().length];
+ for (int i = 0; i < fieldPermutation.length; i++)
+ fieldPermutation[i] = i;
+ ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+ for (int i = 0; i < typeTraits.length; i++)
+ typeTraits[i] = new TypeTraits(false);
+ TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ storageManagerInterface, treeRegistry, fileSplitProvider, typeTraits, comparatorFactories,
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
+ NoOpOperationCallbackProvider.INSTANCE);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
+ sorter, 0, writer, 0);
+
+ spec.addRoot(writer);
+ runTest(spec);
+ }
+
+ private void runIndexJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ /** sort operator */
+ int[] sortFields = new int[2];
+ sortFields[0] = 1;
+ sortFields[1] = 0;
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+ comparatorFactories[0] = stringComparatorFactory;
+ comparatorFactories[1] = stringComparatorFactory;
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+ comparatorFactories, ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+ /** index join operator */
+ int[] keyFields = new int[1];
+ keyFields[0] = 1;
+ IBinaryComparatorFactory[] keyComparatorFactories = new IBinaryComparatorFactory[1];
+ keyComparatorFactories[0] = stringComparatorFactory;
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+ ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+ for (int i = 0; i < typeTraits.length; i++)
+ typeTraits[i] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
+ storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+ typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
+ new BTreeDataflowHelperFactory());
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+ /** results (already in sorted order) */
+ FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(ACTUAL_RESULT_FILE)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
+ sorter, 0, join, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ keyFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories),
+ join, 0, writer, 0);
+
+ spec.addRoot(writer);
+ runTest(spec);
+ }
+
+ private void runLeftOuterHashJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/customer-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/customer-part2.tbl"))) };
+ IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits);
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[] { JoinTestNullWriterFactory.INSTANCE,
+ JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+ JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+ JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+ JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE };
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE }, '|'), custDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID);
+
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 0 },
+ new int[] { 1 }, new IBinaryHashFunctionFactory[] { stringHashFactory },
+ new IBinaryComparatorFactory[] { stringComparatorFactory }, custOrderJoinDesc, true,
+ nullWriterFactories, 128);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+ int[] projectFields = new int[] { 8, 9, 10, 11, 12, 13, 14, 15, 16, 0, 1, 2, 3, 4, 5, 6, 7 };
+ ProjectOperatorDescriptor project = new ProjectOperatorDescriptor(spec, custOrderJoinDesc, projectFields);
+
+ int[] sortFields = new int[2];
+ sortFields[0] = 9;
+ sortFields[1] = 0;
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+ comparatorFactories[0] = stringComparatorFactory;
+ comparatorFactories[1] = stringComparatorFactory;
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+ comparatorFactories, custOrderJoinDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+ FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(EXPECTED_RESULT_FILE)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+ IConnectorDescriptor ordJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 1 },
+ new IBinaryHashFunctionFactory[] { stringHashFactory }));
+ spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+ IConnectorDescriptor custJoinConn = new MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 0 },
+ new IBinaryHashFunctionFactory[] { stringHashFactory }));
+ spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), join, 0, project, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), project, 0, sorter, 0);
+ IConnectorDescriptor joinWriterConn = new MToNPartitioningMergingConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] { 9 },
+ new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories);
+ spec.connect(joinWriterConn, sorter, 0, writer, 0);
+
+ spec.addRoot(writer);
+ runTest(spec);
+ }
+
+ private void runIndexRightOuterJoin() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new FileReference(new File("data/tpch0.001/orders-part1.tbl"))),
+ new FileSplit(NC2_ID, new FileReference(new File("data/tpch0.001/orders-part2.tbl"))) };
+ IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits);
+
+ INullWriterFactory[] nullWriterFactories = new INullWriterFactory[] { JoinTestNullWriterFactory.INSTANCE,
+ JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+ JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+ JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE,
+ JoinTestNullWriterFactory.INSTANCE, JoinTestNullWriterFactory.INSTANCE };
+
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE });
+
+ FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+ new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID);
+
+ /** sort operator */
+ int[] sortFields = new int[2];
+ sortFields[0] = 1;
+ sortFields[1] = 0;
+ IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[2];
+ comparatorFactories[0] = stringComparatorFactory;
+ comparatorFactories[1] = stringComparatorFactory;
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 1024, sortFields,
+ comparatorFactories, ordersDesc);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, sorter, NC1_ID, NC2_ID);
+
+ /** index join operator */
+ int[] keyFields = new int[1];
+ keyFields[0] = 1;
+ IBinaryComparatorFactory[] keyComparatorFactories = new IBinaryComparatorFactory[1];
+ keyComparatorFactories[0] = stringComparatorFactory;
+ IFileSplitProvider fileSplitProvider = ClusterConfig.getFileSplitProvider(JOB_NAME, JOB_NAME);
+ ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
+ for (int i = 0; i < typeTraits.length; i++)
+ typeTraits[i] = new TypeTraits(false);
+ ITreeIndexFrameFactory interiorFrameFactory = new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ ITreeIndexFrameFactory leafFrameFactory = new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(
+ typeTraits));
+ IndexNestedLoopJoinOperatorDescriptor join = new IndexNestedLoopJoinOperatorDescriptor(spec, custOrderJoinDesc,
+ storageManagerInterface, treeRegistry, fileSplitProvider, interiorFrameFactory, leafFrameFactory,
+ typeTraits, keyComparatorFactories, true, keyFields, keyFields, true, true,
+ new BTreeDataflowHelperFactory(), true, nullWriterFactories);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID);
+
+ /** results (already in sorted order) */
+ FileSplit resultFile = new FileSplit(NC1_ID, new FileReference(new File(ACTUAL_RESULT_FILE)));
+ FileSplit[] results = new FileSplit[1];
+ results[0] = resultFile;
+ IFileSplitProvider resultFileSplitProvider = new ConstantFileSplitProvider(results);
+ FileWriteOperatorDescriptor writer = new FileWriteOperatorDescriptor(spec, null, resultFileSplitProvider, null,
+ null);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, new String[] { NC1_ID });
+ PartitionConstraintHelper.addPartitionCountConstraint(spec, writer, 1);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ keyFields, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
+ UTF8StringPointable.FACTORY) }), sortFields, comparatorFactories), sorter, 0, join, 0);
+
+ IBinaryComparatorFactory[] mergeComparatorFactories = new IBinaryComparatorFactory[2];
+ mergeComparatorFactories[0] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
+ mergeComparatorFactories[1] = new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY);
+ int[] mergeFields = new int[] { 9, 0 };
+ spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ new int[] { 9 }, new IBinaryHashFunctionFactory[] { new PointableBinaryHashFunctionFactory(
+ UTF8StringPointable.FACTORY) }), mergeFields, comparatorFactories), join, 0, writer, 0);
+
+ spec.addRoot(writer);
+ runTest(spec);
+ }
+
+ private void runTest(JobSpecification spec) throws Exception {
+ PregelixHyracksIntegrationUtil.runJob(spec, HYRACKS_APP_NAME);
+ }
+}
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTestNullWriterFactory.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTestNullWriterFactory.java
new file mode 100644
index 0000000..8f2e546
--- /dev/null
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTestNullWriterFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.join;
+
+import java.io.DataOutput;
+
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriter;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+
+public class JoinTestNullWriterFactory implements INullWriterFactory {
+ private static final long serialVersionUID = 1L;
+ public static INullWriterFactory INSTANCE = new JoinTestNullWriterFactory();
+
+ @Override
+ public INullWriter createNullWriter() {
+ return new INullWriter() {
+
+ @Override
+ public void writeNull(DataOutput out) throws HyracksDataException {
+ UTF8StringSerializerDeserializer.INSTANCE.serialize("NULL", out);
+ }
+
+ };
+ }
+
+}
diff --git a/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/util/TestUtils.java b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/util/TestUtils.java
new file mode 100644
index 0000000..83dd10d
--- /dev/null
+++ b/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/util/TestUtils.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.pregelix.core.util;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+
+public class TestUtils {
+
+ public static void compareWithResult(File expectedFile, File actualFile) throws Exception {
+ BufferedReader readerExpected = new BufferedReader(new FileReader(expectedFile));
+ BufferedReader readerActual = new BufferedReader(new FileReader(actualFile));
+ String lineExpected, lineActual;
+ int num = 1;
+ try {
+ while ((lineExpected = readerExpected.readLine()) != null) {
+ lineActual = readerActual.readLine();
+ // Assert.assertEquals(lineExpected, lineActual);
+ if (lineActual == null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< " + lineExpected + "\n> ");
+ }
+ if (!equalStrings(lineExpected, lineActual)) {
+ throw new Exception("Result for changed at line " + num + ":\n< " + lineExpected + "\n> "
+ + lineActual);
+ }
+ ++num;
+ }
+ lineActual = readerActual.readLine();
+ if (lineActual != null) {
+ throw new Exception("Actual result changed at line " + num + ":\n< \n> " + lineActual);
+ }
+ } finally {
+ readerExpected.close();
+ readerActual.close();
+ }
+ }
+
+ private static boolean equalStrings(String s1, String s2) {
+ String[] rowsOne = s1.split("\n");
+ String[] rowsTwo = s2.split("\n");
+
+ if (rowsOne.length != rowsTwo.length)
+ return false;
+
+ for (int i = 0; i < rowsOne.length; i++) {
+ String row1 = rowsOne[i];
+ String row2 = rowsTwo[i];
+
+ if (row1.equals(row2))
+ continue;
+
+ String[] fields1 = row1.split(",");
+ String[] fields2 = row2.split(",");
+
+ for (int j = 0; j < fields1.length; j++) {
+ if (fields1[j].equals(fields2[j])) {
+ continue;
+ } else if (fields1[j].indexOf('.') < 0) {
+ return false;
+ } else {
+ fields1[j] = fields1[j].split("=")[1];
+ fields2[j] = fields2[j].split("=")[1];
+ Double double1 = Double.parseDouble(fields1[j]);
+ Double double2 = Double.parseDouble(fields2[j]);
+ float float1 = (float) double1.doubleValue();
+ float float2 = (float) double2.doubleValue();
+
+ if (Math.abs(float1 - float2) == 0)
+ continue;
+ else {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+}
diff --git a/pregelix-core/src/test/resources/cluster/data.properties b/pregelix-core/src/test/resources/cluster/data.properties
new file mode 100644
index 0000000..daf881e
--- /dev/null
+++ b/pregelix-core/src/test/resources/cluster/data.properties
@@ -0,0 +1 @@
+store=teststore
\ No newline at end of file
diff --git a/pregelix-core/src/test/resources/hadoop/conf/core-site.xml b/pregelix-core/src/test/resources/hadoop/conf/core-site.xml
new file mode 100644
index 0000000..47dfac5
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/core-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+ <name>fs.default.name</name>
+ <value>hdfs://127.0.0.1:31888</value>
+</property>
+<property>
+ <name>hadoop.tmp.dir</name>
+ <value>/tmp/hadoop</value>
+</property>
+
+
+</configuration>
diff --git a/pregelix-core/src/test/resources/hadoop/conf/hdfs-site.xml b/pregelix-core/src/test/resources/hadoop/conf/hdfs-site.xml
new file mode 100644
index 0000000..8d29b1d
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/hdfs-site.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+<property>
+ <name>dfs.replication</name>
+ <value>1</value>
+</property>
+
+<property>
+ <name>dfs.block.size</name>
+ <value>65536</value>
+</property>
+
+</configuration>
diff --git a/pregelix-core/src/test/resources/hadoop/conf/log4j.properties b/pregelix-core/src/test/resources/hadoop/conf/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml b/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
new file mode 100644
index 0000000..1b9a4d6
--- /dev/null
+++ b/pregelix-core/src/test/resources/hadoop/conf/mapred-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+
+ <property>
+ <name>mapred.job.tracker</name>
+ <value>localhost:29007</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.map.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.tasktracker.reduce.tasks.maximum</name>
+ <value>20</value>
+ </property>
+ <property>
+ <name>mapred.min.split.size</name>
+ <value>65536</value>
+ </property>
+
+</configuration>
diff --git a/pregelix-core/src/test/resources/hyracks-deployment.properties b/pregelix-core/src/test/resources/hyracks-deployment.properties
new file mode 100644
index 0000000..2ae9818
--- /dev/null
+++ b/pregelix-core/src/test/resources/hyracks-deployment.properties
@@ -0,0 +1,2 @@
+#cc.bootstrap.class=edu.uci.ics.asterix.hyracks.bootstrap.CCBootstrapImpl
+nc.bootstrap.class=edu.uci.ics.pregelix.runtime.bootstrap.NCBootstrapImpl
\ No newline at end of file
diff --git a/pregelix-core/src/test/resources/log4j.properties b/pregelix-core/src/test/resources/log4j.properties
new file mode 100755
index 0000000..d5e6004
--- /dev/null
+++ b/pregelix-core/src/test/resources/log4j.properties
@@ -0,0 +1,94 @@
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=FATAL,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshhold=FATAL
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# Rolling File Appender
+#
+
+#log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+#log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Logfile size and and 30-day backups
+#log4j.appender.RFA.MaxFileSize=1MB
+#log4j.appender.RFA.MaxBackupIndex=30
+
+#log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} - %m%n
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+#
+# FSNamesystem Audit logging
+# All audit events are logged at INFO level
+#
+log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN
+
+# Custom Logging levels
+
+#log4j.logger.org.apache.hadoop.mapred.JobTracker=DEBUG
+#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG
+#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
+
+# Jets3t library
+log4j.logger.org.jets3t.service.impl.rest.httpclient.RestS3Service=ERROR
+
+#
+# Event Counter Appender
+# Sends counts of logging messages at different severity levels to Hadoop Metrics.
+#
+log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
diff --git a/pregelix-core/src/test/resources/logging.properties b/pregelix-core/src/test/resources/logging.properties
new file mode 100644
index 0000000..f43eb05
--- /dev/null
+++ b/pregelix-core/src/test/resources/logging.properties
@@ -0,0 +1,66 @@
+############################################################
+# Default Logging Configuration File
+#
+# You can use a different file by specifying a filename
+# with the java.util.logging.config.file system property.
+# For example java -Djava.util.logging.config.file=myfile
+############################################################
+
+############################################################
+# Global properties
+############################################################
+
+# "handlers" specifies a comma separated list of log Handler
+# classes. These handlers will be installed during VM startup.
+# Note that these classes must be on the system classpath.
+# By default we only configure a ConsoleHandler, which will only
+# show messages at the INFO and above levels.
+
+handlers= java.util.logging.ConsoleHandler
+
+# To also add the FileHandler, use the following line instead.
+
+# handlers= java.util.logging.FileHandler, java.util.logging.ConsoleHandler
+
+# Default global logging level.
+# This specifies which kinds of events are logged across
+# all loggers. For any given facility this global level
+# can be overriden by a facility specific level
+# Note that the ConsoleHandler also has a separate level
+# setting to limit messages printed to the console.
+
+.level= WARNING
+# .level= INFO
+# .level= FINE
+# .level = FINEST
+
+############################################################
+# Handler specific properties.
+# Describes specific configuration info for Handlers.
+############################################################
+
+# default file output is in user's home directory.
+
+# java.util.logging.FileHandler.pattern = %h/java%u.log
+# java.util.logging.FileHandler.limit = 50000
+# java.util.logging.FileHandler.count = 1
+# java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter
+
+# Limit the message that are printed on the console to FINE and above.
+
+java.util.logging.ConsoleHandler.level = WARNING
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
+
+
+############################################################
+# Facility specific properties.
+# Provides extra control for each logger.
+############################################################
+
+# For example, set the com.xyz.foo logger to only log SEVERE
+# messages:
+
+#edu.uci.ics.asterix.level = FINE
+#edu.uci.ics.algebricks.level = FINE
+edu.uci.ics.hyracks.level = INFO
+#edu.uci.ics.hyracks.control.nc.net.level = FINE
\ No newline at end of file