Added tests for the locality-aware connectors.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_dev_next@1152 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
new file mode 100644
index 0000000..36c9cd2
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import edu.uci.ics.hyracks.api.client.HyracksConnection;
+import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
+import edu.uci.ics.hyracks.api.job.JobFlag;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
+import edu.uci.ics.hyracks.control.common.controllers.CCConfig;
+import edu.uci.ics.hyracks.control.common.controllers.NCConfig;
+import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+
+public class AbstractMultiNCIntegrationTest {
+
+ private static final Logger LOGGER = Logger.getLogger(AbstractMultiNCIntegrationTest.class.getName());
+
+ public static final String[] ASTERIX_IDS = {"asterix-001","asterix-002","asterix-003","asterix-004","asterix-005","asterix-006","asterix-007"};
+
+ private static ClusterControllerService cc;
+
+ private static NodeControllerService[] asterixNCs;
+
+ private static IHyracksClientConnection hcc;
+
+ private final List<File> outputFiles;
+
+ @Rule
+ public TemporaryFolder outputFolder = new TemporaryFolder();
+
+ public AbstractMultiNCIntegrationTest(){
+ outputFiles = new ArrayList<File>();;
+ }
+
+ @BeforeClass
+ public static void init() throws Exception {
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = "127.0.0.1";
+ ccConfig.clientNetPort = 39000;
+ ccConfig.clusterNetIpAddress = "127.0.0.1";
+ ccConfig.clusterNetPort = 39001;
+ ccConfig.profileDumpPeriod = 10000;
+ File outDir = new File("target/ClusterController");
+ outDir.mkdirs();
+ File ccRoot = File.createTempFile(AbstractMultiNCIntegrationTest.class.getName(), ".data", outDir);
+ ccRoot.delete();
+ ccRoot.mkdir();
+ ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ cc = new ClusterControllerService(ccConfig);
+ cc.start();
+
+ asterixNCs = new NodeControllerService[ASTERIX_IDS.length];
+ for(int i = 0; i < ASTERIX_IDS.length; i++){
+ NCConfig ncConfig = new NCConfig();
+ ncConfig.ccHost = "localhost";
+ ncConfig.ccPort = 39001;
+ ncConfig.clusterNetIPAddress = "127.0.0.1";
+ ncConfig.dataIPAddress = "127.0.0.1";
+ ncConfig.nodeId = ASTERIX_IDS[i];
+ asterixNCs[i] = new NodeControllerService(ncConfig);
+ asterixNCs[i].start();
+ }
+
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress, ccConfig.clientNetPort);
+ hcc.createApplication("test", null);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Starting CC in " + ccRoot.getAbsolutePath());
+ }
+ }
+
+ @AfterClass
+ public static void deinit() throws Exception {
+ for(NodeControllerService nc : asterixNCs){
+ nc.stop();
+ }
+ cc.stop();
+ }
+
+ protected void runTest(JobSpecification spec) throws Exception {
+ JobId jobId = hcc.createJob("test", spec, EnumSet.of(JobFlag.PROFILE_RUNTIME));
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(spec.toJSON().toString(2));
+ }
+ hcc.start(jobId);
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info(jobId.toString());
+ }
+ hcc.waitForCompletion(jobId);
+ dumpOutputFiles();
+ }
+
+ private void dumpOutputFiles() {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ for (File f : outputFiles) {
+ if (f.exists() && f.isFile()) {
+ try {
+ LOGGER.info("Reading file: " + f.getAbsolutePath() + " in test: " + getClass().getName());
+ String data = FileUtils.readFileToString(f);
+ LOGGER.info(data);
+ } catch (IOException e) {
+ LOGGER.info("Error reading file: " + f.getAbsolutePath());
+ LOGGER.info(e.getMessage());
+ }
+ }
+ }
+ }
+ }
+
+ protected File createTempFile() throws IOException {
+ File tempFile = File.createTempFile(getClass().getName(), ".tmp", outputFolder.getRoot());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Output file: " + tempFile.getAbsolutePath());
+ }
+ outputFiles.add(tempFile);
+ return tempFile;
+ }
+
+}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
new file mode 100644
index 0000000..a7a0a73
--- /dev/null
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.tests.integration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.junit.Test;
+
+import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.data.std.primitive.UTF8StringPointable;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.FloatSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.FloatParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.IntegerParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.GlobalHashingLocalityMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.HashtableLocalityMap;
+import edu.uci.ics.hyracks.dataflow.std.connectors.LocalityAwareMToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
+import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
+import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.HashGroupOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.group.IFieldAggregateDescriptorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.FloatSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.IntSumFieldAggregatorFactory;
+import edu.uci.ics.hyracks.dataflow.std.group.aggregators.MultiFieldsAggregatorFactory;
+
+public class LocalityAwareConnectorTest extends AbstractMultiNCIntegrationTest {
+
+ final IFileSplitProvider splitProvider = new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit("asterix-001", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
+ new FileSplit("asterix-002", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
+ new FileSplit("asterix-003", new FileReference(new File("data/tpch0.001/lineitem.tbl"))),
+ new FileSplit("asterix-004", new FileReference(new File("data/tpch0.001/lineitem.tbl"))) });
+
+ final RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE,
+ UTF8StringSerializerDeserializer.INSTANCE, UTF8StringSerializerDeserializer.INSTANCE });
+
+ final ITupleParserFactory tupleParserFactory = new DelimitedDataTupleParserFactory(new IValueParserFactory[] {
+ UTF8StringParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE,
+ IntegerParserFactory.INSTANCE, IntegerParserFactory.INSTANCE, FloatParserFactory.INSTANCE,
+ FloatParserFactory.INSTANCE, FloatParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE,
+ UTF8StringParserFactory.INSTANCE, }, '|');
+
+ /**
+ * Test of aggregations using locality aware connector. The output two files should be the
+ * same, each of which is the aggregation of two copies of the lineitem.tbl.
+ *
+ * Note that if the hashing connector is not working correctly, the two files may be different. This
+ * also means that even the output files are the same, the hashing may have other problems.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void localityAwareAggregationTest() throws Exception {
+
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int tableSize = 8;
+
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true),
+ new FloatSumFieldAggregatorFactory(5, true) }),
+ outputRec, tableSize);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ "asterix-005", "asterix-006");
+
+ BitSet nodemap = new BitSet(8);
+
+ nodemap.set(0);
+ nodemap.set(2);
+ nodemap.set(5);
+ nodemap.set(7);
+
+ IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }), new HashtableLocalityMap(nodemap));
+
+ new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "localityAwareSumInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ "asterix-005", "asterix-006");
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ /**
+ * Test for locality aware connector, using the global hashing node mapper. This should have
+ * the exactly the same result as using {@link MToNPartitioningConnectorDescriptor}.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void globalPartitionAggregationTest() throws Exception {
+
+ JobSpecification spec = new JobSpecification();
+
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
+ spec, splitProvider, tupleParserFactory, desc);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
+ csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+
+ RecordDescriptor outputRec = new RecordDescriptor(
+ new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE,
+ FloatSerializerDeserializer.INSTANCE });
+
+ int[] keyFields = new int[] { 0 };
+ int tableSize = 8;
+
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
+ spec,
+ keyFields,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }),
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
+ .of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true),
+ new IntSumFieldAggregatorFactory(3, true),
+ new FloatSumFieldAggregatorFactory(5, true) }),
+ outputRec, tableSize);
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
+ "asterix-005", "asterix-006");
+
+ IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }), new GlobalHashingLocalityMap());
+
+ new MToNPartitioningConnectorDescriptor(
+ spec,
+ new FieldHashPartitionComputerFactory(
+ keyFields,
+ new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
+ .of(UTF8StringPointable.FACTORY) }));
+ spec.connect(conn1, csvScanner, 0, grouper, 0);
+
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
+ "localityAwareSumInmemGroupTest");
+
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
+ "asterix-005", "asterix-006");
+
+ IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
+ spec.connect(conn2, grouper, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ private AbstractSingleActivityOperatorDescriptor getPrinter(
+ JobSpecification spec, String prefix) throws IOException {
+
+ AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
+ spec, new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit("asterix-005", createTempFile()
+ .getAbsolutePath()),
+ new FileSplit("asterix-006", createTempFile()
+ .getAbsolutePath()) }), "\t");
+
+ return printer;
+ }
+
+}