Removed commented overlog code. Added auto-expand test
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@24 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index 8b7bcff..3f03526 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -122,48 +122,6 @@
watch(availablenodes, i);
watch(availablenodes, d);
-/*
-
-define(rankedavailablenodes_temp, keys(), {String, Integer});
-
-rankedavailablenodes_temp(NodeId, 0) :-
- availablenodes#insert(NodeId);
-
-rankedavailablenodes_temp(NodeId2, NewRank) :-
- rankedavailablenodes_temp#insert(NodeId1, Rank),
- rankedavailablenodes_temp(NodeId2, Rank),
- NodeId1 < NodeId2
- {
- NewRank := Rank + 1;
- };
-
-rankedavailablenodes_temp(NodeId2, NewRank) :-
- rankedavailablenodes_temp(NodeId1, Rank),
- rankedavailablenodes_temp#insert(NodeId2, Rank),
- NodeId1 < NodeId2
- {
- NewRank := Rank + 1;
- };
-
-rankedavailablenodes_temp(NodeId, Rank) :-
- availablenodes(NodeId),
- rankedavailablenodes_temp(NodeId, Rank);
-
-rankedavailablenodes_temp(NodeId, NewRank) :-
- rankedavailablenodes_temp(NodeId, Rank),
- Rank != 0,
- notin rankedavailablenodes_temp(_, Rank - 1)
- {
- NewRank := Rank - 1;
- };
-
-define(rankedavailablenodes, keys(0), {String, Integer});
-
-rankedavailablenodes(NodeId, max<Rank>) :-
- rankedavailablenodes_temp(NodeId, Rank);
-
-*/
-
define(availablenodecount, keys(0), {Integer, Integer});
watch(availablenodecount, a);
@@ -213,41 +171,8 @@
operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
operatorclonecount(JobId, OperatorId, NPartitions);
-/*
-define(operatorclonecountexpansion, keys(0, 1), {UUID, OperatorDescriptorId, Integer});
-
-operatorclonecountexpansion(JobId, OperatorId, 0) :-
- operatorclonecount(JobId, OperatorId, _);
-
-operatorclonecountexpansion(JobId, OperatorId, Partition + 1) :-
- operatorclonecountexpansion#insert(JobId, OperatorId, Partition),
- operatorclonecount(JobId, OperatorId, NPartitions),
- Partition < NPartitions - 1;
-*/
-
define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
-/*
-operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, 0) :-
- operatorclonecountexpansion#insert(JobId, OperatorId, Partition);
-
-operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, NewRank) :-
- operatorclonecountexpansiontotalorder#insert(JobId, OperatorId1, Partition1, Rank),
- operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, Rank),
- OperatorId1.hashCode() < OperatorId2.hashCode() || (OperatorId1.hashCode() == OperatorId2.hashCode() && Partition1 < Partition2)
- {
- NewRank := Rank + 1;
- };
-
-operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, NewRank) :-
- operatorclonecountexpansiontotalorder(JobId, OperatorId1, Partition1, Rank),
- operatorclonecountexpansiontotalorder#insert(JobId, OperatorId2, Partition2, Rank),
- OperatorId1.hashCode() < OperatorId2.hashCode() || (OperatorId1.hashCode() == OperatorId2.hashCode() && Partition1 < Partition2)
- {
- NewRank := Rank + 1;
- };
-*/
-
operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
@@ -259,12 +184,6 @@
watch(operatorclonecount, i);
watch(operatorclonecount, d);
-/*
-watch(operatorclonecountexpansion, a);
-watch(operatorclonecountexpansion, i);
-watch(operatorclonecountexpansion, d);
-*/
-
define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 7494f11..6770fd8 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -22,6 +22,7 @@
import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
@@ -259,6 +260,115 @@
}
@Test
+ public void customerOrderCIDJoinAutoExpand() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+ };
+ RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
+
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+ };
+ RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
+
+ RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
+
+ CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
+ ordScanner.setPartitionConstraint(ordersPartitionConstraint);
+
+ CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
+ custScanner.setPartitionConstraint(custPartitionConstraint);
+
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+ 1
+ }, new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, custOrderJoinDesc, 128);
+ join.setPartitionConstraint(new PartitionCountConstraint(2));
+
+ PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
+ printer.setPartitionConstraint(printerPartitionConstraint);
+
+ IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
+ spec.connect(ordJoinConn, ordScanner, 0, join, 0);
+
+ IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
+ spec.connect(custJoinConn, custScanner, 0, join, 1);
+
+ IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
+ spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+ spec.addRoot(printer);
+ runTest(spec);
+ }
+
+ @Test
public void customerOrderCIDJoinMultiMaterialized() throws Exception {
JobSpecification spec = new JobSpecification();