Added tests for the locality-aware connectors.

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1152 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
new file mode 100644
index 0000000..36c9cd2
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class AbstractMultiNCIntegrationTest {
+
+    private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
+
+    public static final String[] ASTERIX_IDS = {"asterix-001","asterix-002","asterix-003","asterix-004","asterix-005","asterix-006","asterix-007"};
+
+    private static ClusterControllerService cc;
+    
+    private static NodeControllerService[] asterixNCs;
+    
+    private static IHyracksClientConnection hcc;
+    
+    private final List<File> outputFiles;
+
+    @Rule
+    public TemporaryFolder outputFolder = new TemporaryFolder();
+    
+    public AbstractMultiNCIntegrationTest(){
+        outputFiles = new ArrayList<File>();;
+    }
+    
+    @BeforeClass
+    public static void init() throws Exception {
+        CCConfig ccConfig = new CCConfig();
+        ccConfig.clientNetIpAddress = "127.0.0.1";
+        ccConfig.clientNetPort = 39000;
+        ccConfig.clusterNetIpAddress = "127.0.0.1";
+        ccConfig.clusterNetPort = 39001;
+        ccConfig.profileDumpPeriod = 10000;
+        File outDir = new File("target/ClusterController");
+        outDir.mkdirs();
+        File ccRoot = File.createTempFile(AbstractMultiNCIntegrationTest.class.getName(), ".data", outDir);
+        ccRoot.delete();
+        ccRoot.mkdir();
+        ccConfig.ccRoot = ccRoot.getAbsolutePath();
+        cc = new ClusterControllerService(ccConfig);
+        cc.start();
+        
+        asterixNCs = new NodeControllerService[ASTERIX_IDS.length];
+        for(int i = 0; i < ASTERIX_IDS.length; i++){
+            NCConfig ncConfig = new NCConfig();
+            ncConfig.ccHost = "localhost";
+            ncConfig.ccPort = 39001;
+            ncConfig.clusterNetIPAddress = "127.0.0.1";
+            ncConfig.dataIPAddress = "127.0.0.1";
+            ncConfig.nodeId = ASTERIX_IDS[i];
+            asterixNCs[i] = new NodeControllerService(ncConfig);
+            asterixNCs[i].start();
+        }
+        
+        hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+        hcc.createApplication("test", null);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+        }
+    }
+    
+    @AfterClass
+    public static void deinit() throws Exception {
+        for(NodeControllerService nc : asterixNCs){
+            nc.stop();
+        }
+        cc.stop();
+    }
+    
+    protected void runTest(JobSpecification spec) throws Exception {
+        JobId jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(spec.toJSON().toString(2));
+        }
+        hcc.start(jobId);
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info(jobId.toString());
+        }
+        hcc.waitForCompletion(jobId);
+        dumpOutputFiles();
+    }
+    
+    private void dumpOutputFiles() {
+        if (LOGGER.isLoggable(Level.INFO)) {
+            for (File f : outputFiles) {
+                if (f.exists() && f.isFile()) {
+                    try {
+                        LOGGER.info("Reading file: " + f.getAbsolutePath() + " in test: " + getClass().getName());
+                        String data = FileUtils.readFileToString(f);
+                        LOGGER.info(data);
+                    } catch (IOException e) {
+                        LOGGER.info("Error reading file: " + f.getAbsolutePath());
+                        LOGGER.info(e.getMessage());
+                    }
+                }
+            }
+        }
+    }
+    
+    protected File createTempFile() throws IOException {
+        File tempFile = File.createTempFile(getClass().getName(), ".tmp", outputFolder.getRoot());
+        if (LOGGER.isLoggable(Level.INFO)) {
+            LOGGER.info("Output file: " + tempFile.getAbsolutePath());
+        }
+        outputFiles.add(tempFile);
+        return tempFile;
+    }
+    
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
new file mode 100644
index 0000000..a7a0a73
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.GlobalHashingLocalityMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.HashtableLocalityMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.LocalityAwareMToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+
+public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest {
+
+    final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+            new FileSplit("asterix-001", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
+            new FileSplit("asterix-002", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
+            new FileSplit("asterix-003", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
+            new FileSplit("asterix-004", new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+
+    final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+            UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+            IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+            UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+    final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+            UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+            IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+            FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+            UTF8StringParserFactory.INSTANCE, }, '|');
+
+    /**
+     * Test of aggregations using locality aware connector. The output two files should be the
+     * same, each of which is the aggregation of two copies of the lineitem.tbl. 
+     * 
+     * Note that if the hashing connector is not working correctly, the two files may be different. This
+     * also means that even the output files are the same, the hashing may have other problems.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void localityAwareAggregationTest() throws Exception {
+
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int tableSize = 8;
+
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true),
+                                new FloatSumFieldAggregatorFactory(5, true) }),
+                outputRec, tableSize);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                "asterix-005", "asterix-006");
+        
+        BitSet nodemap = new BitSet(8);
+        
+        nodemap.set(0);
+        nodemap.set(2);
+        nodemap.set(5);
+        nodemap.set(7);
+
+        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), new HashtableLocalityMap(nodemap));
+                
+                new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "localityAwareSumInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                "asterix-005", "asterix-006");
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+    
+    /**
+     * Test for locality aware connector, using the global hashing node mapper. This should have
+     * the exactly the same result as using {@link MToNPartitioningConnectorDescriptor}.
+     *  
+     * @throws Exception
+     */
+    @Test
+    public void globalPartitionAggregationTest() throws Exception {
+
+        JobSpecification spec = new JobSpecification();
+
+        FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+                spec, splitProvider, tupleParserFactory, desc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+                csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+
+        RecordDescriptor outputRec = new RecordDescriptor(
+                new ISerializerDeserializer[] {
+                        UTF8StringSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        IntegerSerializerDeserializer.INSTANCE,
+                        FloatSerializerDeserializer.INSTANCE });
+
+        int[] keyFields = new int[] { 0 };
+        int tableSize = 8;
+
+        HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+                spec,
+                keyFields,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }),
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+                        .of(UTF8StringPointable.FACTORY) },
+                new MultiFieldsAggregatorFactory(
+                        new IFieldAggregateDescriptorFactory[] {
+                                new IntSumFieldAggregatorFactory(1, true),
+                                new IntSumFieldAggregatorFactory(3, true),
+                                new FloatSumFieldAggregatorFactory(5, true) }),
+                outputRec, tableSize);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+                "asterix-005", "asterix-006");
+
+        IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }), new GlobalHashingLocalityMap());
+                
+                new MToNPartitioningConnectorDescriptor(
+                spec,
+                new FieldHashPartitionComputerFactory(
+                        keyFields,
+                        new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+                                .of(UTF8StringPointable.FACTORY) }));
+        spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+        AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+                "localityAwareSumInmemGroupTest");
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+                "asterix-005", "asterix-006");
+
+        IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+        spec.connect(conn2, grouper, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+    }
+    
+    private AbstractSingleActivityOperatorDescriptor getPrinter(
+            JobSpecification spec, String prefix) throws IOException {
+
+        AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
+                spec, new ConstantFileSplitProvider(new FileSplit[] {
+                        new FileSplit("asterix-005", createTempFile()
+                                .getAbsolutePath()),
+                        new FileSplit("asterix-006", createTempFile()
+                                .getAbsolutePath()) }), "\t");
+
+        return printer;
+    }
+
+}