Fault-recovery works. Added auto-scheduling strategy.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks-next@23 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
index 858bb65..bbd4b4f 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/AbsoluteLocationConstraint.java
@@ -27,8 +27,8 @@
}
@Override
- public ConstraintType getConstraintType() {
- return ConstraintType.ABSOLUTE;
+ public LocationConstraintType getConstraintType() {
+ return LocationConstraintType.ABSOLUTE;
}
public String getLocationId() {
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
index c14345f..edfb1c5 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ChoiceLocationConstraint.java
@@ -28,7 +28,7 @@
}
@Override
- public ConstraintType getConstraintType() {
- return ConstraintType.CHOICE;
+ public LocationConstraintType getConstraintType() {
+ return LocationConstraintType.CHOICE;
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java
new file mode 100644
index 0000000..16b7b24
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/ExplicitPartitionConstraint.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+import java.util.Arrays;
+
+public class ExplicitPartitionConstraint extends PartitionConstraint {
+ private static final long serialVersionUID = 1L;
+
+ private final LocationConstraint[] locationConstraints;
+
+ public ExplicitPartitionConstraint(LocationConstraint[] locationConstraints) {
+ this.locationConstraints = locationConstraints;
+ }
+
+ public LocationConstraint[] getLocationConstraints() {
+ return locationConstraints;
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.deepToString(locationConstraints);
+ }
+
+ @Override
+ public PartitionConstraintType getPartitionConstraintType() {
+ return PartitionConstraintType.EXPLICIT;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
index d34b6b2..1228b90 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/LocationConstraint.java
@@ -19,10 +19,10 @@
public abstract class LocationConstraint implements Serializable {
private static final long serialVersionUID = 1L;
- public enum ConstraintType {
+ public enum LocationConstraintType {
ABSOLUTE,
CHOICE
}
- public abstract ConstraintType getConstraintType();
+ public abstract LocationConstraintType getConstraintType();
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
index c825539..82a6f7b 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionConstraint.java
@@ -15,22 +15,14 @@
package edu.uci.ics.hyracks.api.constraints;
import java.io.Serializable;
-import java.util.Arrays;
-public class PartitionConstraint implements Serializable {
+public abstract class PartitionConstraint implements Serializable {
private static final long serialVersionUID = 1L;
- private final LocationConstraint[] locationConstraints;
+ public abstract PartitionConstraintType getPartitionConstraintType();
- public PartitionConstraint(LocationConstraint[] locationConstraints) {
- this.locationConstraints = locationConstraints;
- }
-
- public LocationConstraint[] getLocationConstraints() {
- return locationConstraints;
- }
-
- public String toString() {
- return Arrays.deepToString(locationConstraints);
+ public enum PartitionConstraintType {
+ EXPLICIT,
+ COUNT
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java
new file mode 100644
index 0000000..b49d880
--- /dev/null
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/api/constraints/PartitionCountConstraint.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.constraints;
+
+public class PartitionCountConstraint extends PartitionConstraint {
+ private static final long serialVersionUID = 1L;
+
+ private final int count;
+
+ public PartitionCountConstraint(int count) {
+ this.count = count;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ @Override
+ public PartitionConstraintType getPartitionConstraintType() {
+ return PartitionConstraintType.COUNT;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
index 89f7566..ea8ede3 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionEntry.java
@@ -63,43 +63,44 @@
public boolean dispatch(SelectionKey key) throws IOException {
if (aborted) {
recvListener.dataReceived(this);
- }
- if (key.isReadable()) {
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
- }
- int bytesRead = socketChannel.read(readBuffer);
- if (bytesRead < 0) {
- recvListener.eos(this);
- return true;
- }
- if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
- }
- recvListener.dataReceived(this);
- } else if (key.isWritable()) {
- synchronized (this) {
- writeBuffer.flip();
+ } else {
+ if (key.isReadable()) {
if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
}
- int bytesWritten = socketChannel.write(writeBuffer);
- if (bytesWritten < 0) {
+ int bytesRead = socketChannel.read(readBuffer);
+ if (bytesRead < 0) {
+ recvListener.eos(this);
return true;
}
if (LOGGER.isLoggable(Level.FINER)) {
- LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
}
- if (writeBuffer.remaining() <= 0) {
- int ops = key.interestOps();
- key.interestOps(ops & ~SelectionKey.OP_WRITE);
+ recvListener.dataReceived(this);
+ } else if (key.isWritable()) {
+ synchronized (this) {
+ writeBuffer.flip();
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ }
+ int bytesWritten = socketChannel.write(writeBuffer);
+ if (bytesWritten < 0) {
+ return true;
+ }
+ if (LOGGER.isLoggable(Level.FINER)) {
+ LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
+ }
+ if (writeBuffer.remaining() <= 0) {
+ int ops = key.interestOps();
+ key.interestOps(ops & ~SelectionKey.OP_WRITE);
+ }
+ writeBuffer.compact();
+ notifyAll();
}
- writeBuffer.compact();
- notifyAll();
+ } else {
+ LOGGER.warning("Spurious event triggered: " + key.readyOps());
+ return true;
}
- } else {
- LOGGER.warning("Spurious event triggered: " + key.readyOps());
- return true;
}
return false;
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
index 464a95c..55e2962 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/ConnectionManager.java
@@ -187,9 +187,7 @@
}
}
}
- synchronized (dataListenerThread) {
- dataListenerThread.pendingAbortConnections.addAll(abortConnections);
- }
+ dataListenerThread.addPendingAbortConnections(abortConnections);
}
private final class NetworkFrameWriter implements IFrameWriter {
@@ -271,11 +269,15 @@
}
synchronized void addSocketChannel(SocketChannel sc) throws IOException {
- LOGGER.info("Connection received");
pendingNewSockets.add(sc);
selector.wakeup();
}
+ synchronized void addPendingAbortConnections(List<IConnectionEntry> abortConnections) {
+ pendingAbortConnections.addAll(abortConnections);
+ selector.wakeup();
+ }
+
@Override
public void run() {
while (!stopped) {
@@ -309,6 +311,7 @@
connections.remove(ce);
}
}
+ pendingAbortConnections.clear();
}
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Selector: " + n);
@@ -335,7 +338,7 @@
}
}
}
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
index 2cfcb94..694e61a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/DemuxDataReceiveListenerFactory.java
@@ -75,8 +75,10 @@
LOGGER.finest("NonDeterministicDataReceiveListener: frame received: sender = " + senderIndex);
}
SelectionKey key = entry.getSelectionKey();
- int ops = key.interestOps();
- key.interestOps(ops & ~SelectionKey.OP_READ);
+ if (key.isValid()) {
+ int ops = key.interestOps();
+ key.interestOps(ops & ~SelectionKey.OP_READ);
+ }
readyBits.set(senderIndex);
notifyAll();
return;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
index 2ae094e..70f0248 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/comm/io/FrameTupleAppender.java
@@ -131,7 +131,7 @@
int fLen = accessor.getFieldEndOffset(tIndex, fields[i])
- accessor.getFieldStartOffset(tIndex, fields[i]);
System.arraycopy(accessor.getBuffer().array(), fSrcStart, buffer.array(), tupleDataEndOffset
- + fSrcSlotsLength + fStartOffset, fLen);
+ + fTargetSlotsLength + fStartOffset, fLen);
fEndOffset += fLen;
buffer.putShort(tupleDataEndOffset + i * 2, (short) fEndOffset);
fStartOffset = fEndOffset;
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
index 4335b87..9d7862a 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/clustercontroller/JOLJobManagerImpl.java
@@ -14,6 +14,7 @@
*/
package edu.uci.ics.hyracks.controller.clustercontroller;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -21,21 +22,27 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Logger;
import jol.core.Runtime;
import jol.types.basic.BasicTupleSet;
import jol.types.basic.Tuple;
import jol.types.basic.TupleSet;
import jol.types.exception.BadKeyException;
+import jol.types.exception.UpdateException;
import jol.types.table.BasicTable;
import jol.types.table.EventTable;
+import jol.types.table.Function;
import jol.types.table.Key;
import jol.types.table.TableName;
import edu.uci.ics.hyracks.api.comm.Endpoint;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
import edu.uci.ics.hyracks.api.constraints.ChoiceLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
+import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.Direction;
@@ -54,18 +61,26 @@
import edu.uci.ics.hyracks.api.job.statistics.StageletStatistics;
public class JOLJobManagerImpl implements IJobManager {
+ private static final Logger LOGGER = Logger.getLogger(JOLJobManagerImpl.class.getName());
+
public static final String JOL_SCOPE = "hyrackscc";
private static final String SCHEDULER_OLG_FILE = "edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg";
private final Runtime jolRuntime;
+ private final LinkedBlockingQueue<Runnable> jobQueue;
+
private final JobTable jobTable;
+ private final JobQueueThread jobQueueThread;
+
private final OperatorDescriptorTable odTable;
private final OperatorLocationTable olTable;
+ private final OperatorCloneCountTable ocTable;
+
private final ConnectorDescriptorTable cdTable;
private final ActivityNodeTable anTable;
@@ -88,17 +103,28 @@
private final AvailableNodesTable availableNodesTable;
+ private final RankedAvailableNodesTable rankedAvailableNodesTable;
+
private final FailedNodesTable failedNodesTable;
private final AbortMessageTable abortMessageTable;
private final AbortNotifyTable abortNotifyTable;
+ private final ExpandPartitionCountConstraintTableFunction expandPartitionCountConstraintFunction;
+
+ private final List<String> rankedAvailableNodes;
+
public JOLJobManagerImpl(final ClusterControllerService ccs, final Runtime jolRuntime) throws Exception {
this.jolRuntime = jolRuntime;
+ jobQueue = new LinkedBlockingQueue<Runnable>();
+ jobQueueThread = new JobQueueThread();
+ jobQueueThread.start();
+
this.jobTable = new JobTable(jolRuntime);
this.odTable = new OperatorDescriptorTable(jolRuntime);
this.olTable = new OperatorLocationTable(jolRuntime);
+ this.ocTable = new OperatorCloneCountTable(jolRuntime);
this.cdTable = new ConnectorDescriptorTable(jolRuntime);
this.anTable = new ActivityNodeTable(jolRuntime);
this.acTable = new ActivityConnectionTable(jolRuntime);
@@ -110,13 +136,17 @@
this.stageletCompleteTable = new StageletCompleteTable(jolRuntime);
this.stageletFailureTable = new StageletFailureTable(jolRuntime);
this.availableNodesTable = new AvailableNodesTable(jolRuntime);
+ this.rankedAvailableNodesTable = new RankedAvailableNodesTable(jolRuntime);
this.failedNodesTable = new FailedNodesTable(jolRuntime);
this.abortMessageTable = new AbortMessageTable(jolRuntime);
this.abortNotifyTable = new AbortNotifyTable(jolRuntime);
+ this.expandPartitionCountConstraintFunction = new ExpandPartitionCountConstraintTableFunction();
+ this.rankedAvailableNodes = new ArrayList<String>();
jolRuntime.catalog().register(jobTable);
jolRuntime.catalog().register(odTable);
jolRuntime.catalog().register(olTable);
+ jolRuntime.catalog().register(ocTable);
jolRuntime.catalog().register(cdTable);
jolRuntime.catalog().register(anTable);
jolRuntime.catalog().register(acTable);
@@ -128,9 +158,11 @@
jolRuntime.catalog().register(stageletCompleteTable);
jolRuntime.catalog().register(stageletFailureTable);
jolRuntime.catalog().register(availableNodesTable);
+ jolRuntime.catalog().register(rankedAvailableNodesTable);
jolRuntime.catalog().register(failedNodesTable);
jolRuntime.catalog().register(abortMessageTable);
jolRuntime.catalog().register(abortNotifyTable);
+ jolRuntime.catalog().register(expandPartitionCountConstraintFunction);
jobTable.register(new JobTable.Callback() {
@Override
@@ -153,83 +185,96 @@
@SuppressWarnings("unchecked")
@Override
public void insertion(TupleSet tuples) {
- try {
- for (Tuple t : tuples) {
- Object[] data = t.toArray();
- UUID jobId = (UUID) data[0];
- UUID stageId = (UUID) data[1];
- Integer attempt = (Integer) data[2];
- JobPlan plan = (JobPlan) data[3];
- Set<List> ts = (Set<List>) data[4];
- Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
- for (List t2 : ts) {
- Object[] t2Data = t2.toArray();
- Set<List> activityInfoSet = (Set<List>) t2Data[1];
- for (List l : activityInfoSet) {
- Object[] lData = l.toArray();
- ActivityNodeId aid = (ActivityNodeId) lData[0];
- Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
- if (opParts == null) {
- opParts = new HashSet<Integer>();
- opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+ for (final Tuple t : tuples) {
+ jobQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ Integer attempt = (Integer) data[2];
+ JobPlan plan = (JobPlan) data[3];
+ Set<List> ts = (Set<List>) data[4];
+ Map<OperatorDescriptorId, Set<Integer>> opPartitions = new HashMap<OperatorDescriptorId, Set<Integer>>();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> opParts = opPartitions.get(aid.getOperatorDescriptorId());
+ if (opParts == null) {
+ opParts = new HashSet<Integer>();
+ opPartitions.put(aid.getOperatorDescriptorId(), opParts);
+ }
+ opParts.add((Integer) lData[1]);
+ }
}
- opParts.add((Integer) lData[1]);
+ ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
+ .size()];
+ int i = 0;
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
+ p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId,
+ plan, stageId, attempt, tasks, opPartitions);
+ }
+ LOGGER.info("Stage start - Phase 1");
+ Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
+ new ClusterControllerService.PortMapMergingAccumulator());
+
+ ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
+ .size()];
+ ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
+ .size()];
+ ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
+ .size()];
+ i = 0;
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
+ Set<List> activityInfoSet = (Set<List>) t2Data[1];
+ for (List l : activityInfoSet) {
+ Object[] lData = l.toArray();
+ ActivityNodeId aid = (ActivityNodeId) lData[0];
+ Set<Integer> aParts = tasks.get(aid);
+ if (aParts == null) {
+ aParts = new HashSet<Integer>();
+ tasks.put(aid, aParts);
+ }
+ aParts.add((Integer) lData[1]);
+ }
+ p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId,
+ plan, stageId, tasks, opPartitions, globalPortMap);
+ p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId,
+ stageId);
+ ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId,
+ stageId);
+ ++i;
+ }
+ LOGGER.info("Stage start - Phase 2");
+ ccs.runRemote(p2is, null);
+ LOGGER.info("Stage start - Phase 3");
+ ccs.runRemote(p3is, null);
+ LOGGER.info("Stage start");
+ ccs.runRemote(ss, null);
+ LOGGER.info("Stage started");
+ } catch (Exception e) {
}
}
- ClusterControllerService.Phase1Installer[] p1is = new ClusterControllerService.Phase1Installer[ts
- .size()];
- int i = 0;
- for (List t2 : ts) {
- Object[] t2Data = t2.toArray();
- Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
- Set<List> activityInfoSet = (Set<List>) t2Data[1];
- for (List l : activityInfoSet) {
- Object[] lData = l.toArray();
- ActivityNodeId aid = (ActivityNodeId) lData[0];
- Set<Integer> aParts = tasks.get(aid);
- if (aParts == null) {
- aParts = new HashSet<Integer>();
- tasks.put(aid, aParts);
- }
- aParts.add((Integer) lData[1]);
- }
- p1is[i++] = new ClusterControllerService.Phase1Installer((String) t2Data[0], jobId, plan,
- stageId, attempt, tasks, opPartitions);
- }
- Map<PortInstanceId, Endpoint> globalPortMap = ccs.runRemote(p1is,
- new ClusterControllerService.PortMapMergingAccumulator());
- ClusterControllerService.Phase2Installer[] p2is = new ClusterControllerService.Phase2Installer[ts
- .size()];
- ClusterControllerService.Phase3Installer[] p3is = new ClusterControllerService.Phase3Installer[ts
- .size()];
- ClusterControllerService.StageStarter[] ss = new ClusterControllerService.StageStarter[ts
- .size()];
- i = 0;
- for (List t2 : ts) {
- Object[] t2Data = t2.toArray();
- Map<ActivityNodeId, Set<Integer>> tasks = new HashMap<ActivityNodeId, Set<Integer>>();
- Set<List> activityInfoSet = (Set<List>) t2Data[1];
- for (List l : activityInfoSet) {
- Object[] lData = l.toArray();
- ActivityNodeId aid = (ActivityNodeId) lData[0];
- Set<Integer> aParts = tasks.get(aid);
- if (aParts == null) {
- aParts = new HashSet<Integer>();
- tasks.put(aid, aParts);
- }
- aParts.add((Integer) lData[1]);
- }
- p2is[i] = new ClusterControllerService.Phase2Installer((String) t2Data[0], jobId, plan,
- stageId, tasks, opPartitions, globalPortMap);
- p3is[i] = new ClusterControllerService.Phase3Installer((String) t2Data[0], jobId, stageId);
- ss[i] = new ClusterControllerService.StageStarter((String) t2Data[0], jobId, stageId);
- ++i;
- }
- ccs.runRemote(p2is, null);
- ccs.runRemote(p3is, null);
- ccs.runRemote(ss, null);
- }
- } catch (Exception e) {
+ });
}
}
});
@@ -242,22 +287,32 @@
@SuppressWarnings("unchecked")
@Override
public void insertion(TupleSet tuples) {
- try {
- for (Tuple t : tuples) {
- Object[] data = t.toArray();
- UUID jobId = (UUID) data[0];
- Set<String> ts = (Set<String>) data[1];
- ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
- .size()];
- int i = 0;
- for (String n : ts) {
- jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+ for (final Tuple t : tuples) {
+ jobQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ Set<String> ts = (Set<String>) data[1];
+ ClusterControllerService.JobCompleteNotifier[] jcns = new ClusterControllerService.JobCompleteNotifier[ts
+ .size()];
+ int i = 0;
+ for (String n : ts) {
+ jcns[i++] = new ClusterControllerService.JobCompleteNotifier(n, jobId);
+ }
+ try {
+ ccs.runRemote(jcns, null);
+ } finally {
+ BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable
+ .createTuple(jobId));
+ jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
+ jolRuntime.evaluate();
+ }
+ } catch (Exception e) {
+ }
}
- ccs.runRemote(jcns, null);
- BasicTupleSet jccTuples = new BasicTupleSet(JobCleanUpCompleteTable.createTuple(jobId));
- jolRuntime.schedule(JOL_SCOPE, JobCleanUpCompleteTable.TABLE_NAME, jccTuples, null);
- }
- } catch (Exception e) {
+ });
}
}
});
@@ -271,28 +326,39 @@
@SuppressWarnings("unchecked")
@Override
public void insertion(TupleSet tuples) {
- try {
- for (Tuple t : tuples) {
- Object[] data = t.toArray();
- UUID jobId = (UUID) data[0];
- UUID stageId = (UUID) data[1];
- Integer attempt = (Integer) data[2];
- Set<List> ts = (Set<List>) data[4];
- ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
- .size()];
- int i = 0;
- BasicTupleSet notificationTuples = new BasicTupleSet();
- for (List t2 : ts) {
- Object[] t2Data = t2.toArray();
- String nodeId = (String) t2Data[0];
- jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId, attempt);
- notificationTuples.add(AbortNotifyTable.createTuple(jobId, stageId, nodeId, attempt));
+ for (final Tuple t : tuples) {
+ jobQueue.add(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object[] data = t.toArray();
+ UUID jobId = (UUID) data[0];
+ UUID stageId = (UUID) data[1];
+ Integer attempt = (Integer) data[2];
+ Set<List> ts = (Set<List>) data[4];
+ ClusterControllerService.JobletAborter[] jas = new ClusterControllerService.JobletAborter[ts
+ .size()];
+ int i = 0;
+ BasicTupleSet notificationTuples = new BasicTupleSet();
+ for (List t2 : ts) {
+ Object[] t2Data = t2.toArray();
+ String nodeId = (String) t2Data[0];
+ jas[i++] = new ClusterControllerService.JobletAborter(nodeId, jobId, stageId,
+ attempt);
+ notificationTuples.add(AbortNotifyTable
+ .createTuple(jobId, stageId, nodeId, attempt));
+ }
+ try {
+ ccs.runRemote(jas, null);
+ } finally {
+ jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples,
+ null);
+ jolRuntime.evaluate();
+ }
+ } catch (Exception e) {
+ }
}
- ccs.runRemote(jas, null);
-
- jolRuntime.schedule(JOL_SCOPE, AbortNotifyTable.TABLE_NAME, notificationTuples, null);
- }
- } catch (Exception e) {
+ });
}
}
});
@@ -302,7 +368,7 @@
}
@Override
- public synchronized UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
+ public UUID createJob(JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) throws Exception {
final UUID jobId = UUID.randomUUID();
final JobPlanBuilder builder = new JobPlanBuilder();
@@ -341,15 +407,11 @@
BasicTupleSet odTuples = new BasicTupleSet();
BasicTupleSet olTuples = new BasicTupleSet();
+ BasicTupleSet ocTuples = new BasicTupleSet();
for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> e : jobSpec.getOperatorMap().entrySet()) {
IOperatorDescriptor od = e.getValue();
- PartitionConstraint pc = od.getPartitionConstraint();
- LocationConstraint[] locationConstraints = pc.getLocationConstraints();
- int nPartitions = locationConstraints.length;
+ int nPartitions = addPartitionConstraintTuples(jobId, od, olTuples, ocTuples);
odTuples.add(OperatorDescriptorTable.createTuple(jobId, nPartitions, od));
- for (int i = 0; i < locationConstraints.length; ++i) {
- addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i]);
- }
od.contributeTaskGraph(gBuilder);
}
@@ -363,6 +425,7 @@
jolRuntime.schedule(JOL_SCOPE, JobTable.TABLE_NAME, jobTuples, null);
jolRuntime.schedule(JOL_SCOPE, OperatorDescriptorTable.TABLE_NAME, odTuples, null);
jolRuntime.schedule(JOL_SCOPE, OperatorLocationTable.TABLE_NAME, olTuples, null);
+ jolRuntime.schedule(JOL_SCOPE, OperatorCloneCountTable.TABLE_NAME, ocTuples, null);
jolRuntime.schedule(JOL_SCOPE, ConnectorDescriptorTable.TABLE_NAME, cdTuples, null);
jolRuntime.schedule(JOL_SCOPE, ActivityNodeTable.TABLE_NAME, anTuples, null);
jolRuntime.schedule(JOL_SCOPE, ActivityConnectionTable.TABLE_NAME, acTuples, null);
@@ -373,17 +436,39 @@
return jobId;
}
+ private int addPartitionConstraintTuples(UUID jobId, IOperatorDescriptor od, BasicTupleSet olTuples,
+ BasicTupleSet ocTuples) {
+ PartitionConstraint pc = od.getPartitionConstraint();
+
+ switch (pc.getPartitionConstraintType()) {
+ case COUNT:
+ int count = ((PartitionCountConstraint) pc).getCount();
+ ocTuples.add(OperatorCloneCountTable.createTuple(jobId, od.getOperatorId(), count));
+ return count;
+
+ case EXPLICIT:
+ LocationConstraint[] locationConstraints = ((ExplicitPartitionConstraint) pc).getLocationConstraints();
+ for (int i = 0; i < locationConstraints.length; ++i) {
+ addLocationConstraintTuple(olTuples, jobId, od.getOperatorId(), i, locationConstraints[i], 0);
+ }
+ return locationConstraints.length;
+ }
+ throw new IllegalArgumentException();
+ }
+
private void addLocationConstraintTuple(BasicTupleSet olTuples, UUID jobId, OperatorDescriptorId opId, int i,
- LocationConstraint locationConstraint) {
+ LocationConstraint locationConstraint, int benefit) {
switch (locationConstraint.getConstraintType()) {
case ABSOLUTE:
String nodeId = ((AbsoluteLocationConstraint) locationConstraint).getLocationId();
- olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i));
+ olTuples.add(OperatorLocationTable.createTuple(jobId, opId, nodeId, i, benefit));
break;
case CHOICE:
+ int index = 0;
for (LocationConstraint lc : ((ChoiceLocationConstraint) locationConstraint).getChoices()) {
- addLocationConstraintTuple(olTuples, jobId, opId, i, lc);
+ addLocationConstraintTuple(olTuples, jobId, opId, i, lc, benefit - index);
+ index++;
}
}
}
@@ -404,7 +489,30 @@
}
@Override
- public void notifyNodeFailure(String nodeId) throws Exception {
+ public synchronized void notifyNodeFailure(String nodeId) throws Exception {
+ int len = rankedAvailableNodes.size();
+ int delIndex = -1;
+ for (int i = 0; i < len; ++i) {
+ if (nodeId.equals(rankedAvailableNodes.get(i))) {
+ delIndex = i;
+ break;
+ }
+ }
+ if (delIndex < 0) {
+ return;
+ }
+ BasicTupleSet delRANTuples = new BasicTupleSet();
+ delRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, delIndex));
+
+ BasicTupleSet insRANTuples = new BasicTupleSet();
+ for (int i = delIndex + 1; i < len; ++i) {
+ insRANTuples.add(RankedAvailableNodesTable.createTuple(rankedAvailableNodes.get(i), i - 1));
+ }
+
+ rankedAvailableNodes.remove(delIndex);
+
+ jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, delRANTuples);
+
BasicTupleSet unavailableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, null, unavailableTuples);
@@ -419,7 +527,7 @@
}
@Override
- public synchronized void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
+ public void notifyStageletComplete(UUID jobId, UUID stageId, int attempt, String nodeId,
StageletStatistics statistics) throws Exception {
BasicTupleSet scTuples = new BasicTupleSet();
scTuples.add(StageletCompleteTable.createTuple(jobId, stageId, nodeId, attempt, statistics));
@@ -430,8 +538,7 @@
}
@Override
- public synchronized void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId)
- throws Exception {
+ public void notifyStageletFailure(UUID jobId, UUID stageId, int attempt, String nodeId) throws Exception {
BasicTupleSet sfTuples = new BasicTupleSet();
sfTuples.add(StageletFailureTable.createTuple(jobId, stageId, nodeId, attempt));
@@ -441,7 +548,7 @@
}
@Override
- public synchronized void start(UUID jobId) throws Exception {
+ public void start(UUID jobId) throws Exception {
BasicTupleSet jsTuples = new BasicTupleSet();
jsTuples.add(JobStartTable.createTuple(jobId, System.currentTimeMillis()));
@@ -451,7 +558,13 @@
}
@Override
- public void registerNode(String nodeId) throws Exception {
+ public synchronized void registerNode(String nodeId) throws Exception {
+ rankedAvailableNodes.add(nodeId);
+ BasicTupleSet insRANTuples = new BasicTupleSet();
+ insRANTuples.add(RankedAvailableNodesTable.createTuple(nodeId, rankedAvailableNodes.size() - 1));
+
+ jolRuntime.schedule(JOL_SCOPE, RankedAvailableNodesTable.TABLE_NAME, insRANTuples, null);
+
BasicTupleSet availableTuples = new BasicTupleSet(AvailableNodesTable.createTuple(nodeId));
jolRuntime.schedule(JOL_SCOPE, AvailableNodesTable.TABLE_NAME, availableTuples, null);
@@ -555,15 +668,37 @@
@SuppressWarnings("unchecked")
private static final Class[] SCHEMA = new Class[] {
- UUID.class, OperatorDescriptorId.class, String.class, Integer.class
+ UUID.class, OperatorDescriptorId.class, String.class, Integer.class, Integer.class
};
public OperatorLocationTable(Runtime context) {
super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
}
- static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition) {
- return new Tuple(jobId, opId, nodeId, partition);
+ static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, String nodeId, int partition, int benefit) {
+ return new Tuple(jobId, opId, nodeId, partition, benefit);
+ }
+ }
+
+ /*
+ * declare(operatorclonecount, keys(0, 1), {JobId, ODId, Count})
+ */
+ private static class OperatorCloneCountTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "operatorclonecount");
+
+ private static Key PRIMARY_KEY = new Key();
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, Integer.class
+ };
+
+ public OperatorCloneCountTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ static Tuple createTuple(UUID jobId, OperatorDescriptorId opId, int cloneCount) {
+ return new Tuple(jobId, opId, cloneCount);
}
}
@@ -821,6 +956,28 @@
}
/*
+ * declare(rankedavailablenodes, keys(0), {NodeId, Integer})
+ */
+ private static class RankedAvailableNodesTable extends BasicTable {
+ private static TableName TABLE_NAME = new TableName(JOL_SCOPE, "rankedavailablenodes");
+
+ private static Key PRIMARY_KEY = new Key(0);
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ String.class, Integer.class
+ };
+
+ public RankedAvailableNodesTable(Runtime context) {
+ super(context, TABLE_NAME, PRIMARY_KEY, SCHEMA);
+ }
+
+ public static Tuple createTuple(String nodeId, int rank) {
+ return new Tuple(nodeId, rank);
+ }
+ }
+
+ /*
* declare(failednodes, keys(0), {NodeId})
*/
private static class FailedNodesTable extends BasicTable {
@@ -881,4 +1038,52 @@
return new Tuple(jobId, stageId, nodeId, attempt);
}
}
+
+ private static class ExpandPartitionCountConstraintTableFunction extends Function {
+ private static final String TABLE_NAME = "expandpartitioncountconstraint";
+
+ @SuppressWarnings("unchecked")
+ private static final Class[] SCHEMA = new Class[] {
+ UUID.class, OperatorDescriptorId.class, Integer.class, Integer.class
+ };
+
+ public ExpandPartitionCountConstraintTableFunction() {
+ super(TABLE_NAME, SCHEMA);
+ }
+
+ @Override
+ public TupleSet insert(TupleSet tuples, TupleSet conflicts) throws UpdateException {
+ TupleSet result = new BasicTupleSet();
+ int counter = 0;
+ for (Tuple t : tuples) {
+ int nPartitions = (Integer) t.value(2);
+ for (int i = 0; i < nPartitions; ++i) {
+ result.add(new Tuple(t.value(0), t.value(1), i, counter++));
+ }
+ }
+ return result;
+ }
+ }
+
+ private class JobQueueThread extends Thread {
+ public JobQueueThread() {
+ setDaemon(true);
+ }
+
+ public void run() {
+ Runnable r;
+ while (true) {
+ try {
+ r = jobQueue.take();
+ } catch (InterruptedException e) {
+ continue;
+ }
+ try {
+ r.run();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
index c202b3d..195fe00 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/NodeControllerService.java
@@ -288,6 +288,10 @@
PortInstanceId piId = new PortInstanceId(spec.getConsumer(conn).getOperatorId(),
Direction.INPUT, spec.getConsumerInputIndex(conn), index);
Endpoint ep = globalPortMap.get(piId);
+ if (ep == null) {
+ LOGGER.info("Got null Endpoint for " + piId);
+ throw new NullPointerException();
+ }
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("Probed endpoint " + piId + " -> " + ep);
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
index 73667ed..e16cf30 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/controller/nodecontroller/Stagelet.java
@@ -117,13 +117,14 @@
+ opIId.getOperatorId() + ":" + opIId.getPartition());
} catch (Exception e) {
e.printStackTrace();
- notifyOperatorFailure(opIId);
+ // notifyOperatorFailure(opIId);
}
try {
hon.run();
notifyOperatorCompletion(opIId);
} catch (Exception e) {
- notifyOperatorFailure(opIId);
+ e.printStackTrace();
+ // notifyOperatorFailure(opIId);
}
}
});
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
index 86c247d..b2dc234 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/group/GroupingHashTable.java
@@ -34,8 +34,7 @@
class GroupingHashTable {
/**
- * The pointers in the link store 3 int values for each entry in the
- * hashtable: (bufferIdx, tIndex, accumulatorIdx).
+ * The pointers in the link store 3 int values for each entry in the hashtable: (bufferIdx, tIndex, accumulatorIdx).
*
* @author vinayakb
*/
@@ -81,8 +80,8 @@
private final FrameTupleAccessor storedKeysAccessor;
GroupingHashTable(HyracksContext ctx, int[] fields, IBinaryComparatorFactory[] comparatorFactories,
- ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
- RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
+ ITuplePartitionComputerFactory tpcf, IAccumulatingAggregatorFactory aggregatorFactory,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor outRecordDescriptor, int tableSize) {
this.ctx = ctx;
appender = new FrameTupleAppender(ctx);
buffers = new ArrayList<ByteBuffer>();
@@ -161,7 +160,7 @@
}
int saIndex = accumulatorSize++;
aggregator = accumulators[saIndex] = aggregatorFactory.createAggregator(inRecordDescriptor,
- outRecordDescriptor);
+ outRecordDescriptor);
aggregator.init(accessor, tIndex);
link.add(sbIndex, stIndex, saIndex);
}
diff --git a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
index c1b917d..7ec0eab 100644
--- a/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
+++ b/hyracks-core/src/main/java/edu/uci/ics/hyracks/coreops/hadoop/HadoopReadOperatorDescriptor.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.controller.IClusterController;
@@ -61,7 +62,7 @@
private Object value;
public HDFSCustomReader(Map<String, String> jobConfMap, HadoopFileSplit inputSplit,
- String inputFormatClassName, Reporter reporter) {
+ String inputFormatClassName, Reporter reporter) {
try {
JobConf conf = DatatypeHelper.hashMap2JobConf((HashMap) jobConfMap);
FileSystem fileSystem = null;
@@ -74,7 +75,7 @@
Class inputFormatClass = Class.forName(inputFormatClassName);
InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance(inputFormatClass, conf);
hadoopRecordReader = (RecordReader) inputFormat.getRecordReader(getFileSplit(inputSplit), conf,
- reporter);
+ reporter);
if (hadoopRecordReader instanceof SequenceFileRecordReader) {
inputKeyClass = ((SequenceFileRecordReader) hadoopRecordReader).getKeyClass();
inputValueClass = ((SequenceFileRecordReader) hadoopRecordReader).getValueClass();
@@ -130,27 +131,27 @@
private FileSplit getFileSplit(HadoopFileSplit hadoopFileSplit) {
FileSplit fileSplit = new FileSplit(new Path(hadoopFileSplit.getFile()), hadoopFileSplit.getStart(),
- hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
+ hadoopFileSplit.getLength(), hadoopFileSplit.getHosts());
return fileSplit;
}
}
public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, JobSpecification spec,
- HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+ HadoopFileSplit[] splits, String inputFormatClassName, RecordDescriptor recordDescriptor) {
super(spec, splits, recordDescriptor);
this.inputFormatClassName = inputFormatClassName;
this.jobConfMap = jobConfMap;
}
public HadoopReadOperatorDescriptor(Map<String, String> jobConfMap, InetSocketAddress nameNode,
- JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+ JobSpecification spec, String inputFormatClassName, RecordDescriptor recordDescriptor) {
super(spec, null, recordDescriptor);
this.inputFormatClassName = inputFormatClassName;
this.jobConfMap = jobConfMap;
}
public HadoopReadOperatorDescriptor(IClusterController clusterController, Map<String, String> jobConfMap,
- JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
+ JobSpecification spec, String fileSystemURL, String inputFormatClassName, RecordDescriptor recordDescriptor) {
super(spec, null, recordDescriptor);
HadoopAdapter hadoopAdapter = HadoopAdapter.getInstance(fileSystemURL);
String inputPathString = jobConfMap.get("mapred.input.dir");
@@ -170,7 +171,7 @@
}
private void configurePartitionConstraints(IClusterController clusterController,
- Map<String, List<HadoopFileSplit>> blocksToRead) {
+ Map<String, List<HadoopFileSplit>> blocksToRead) {
List<LocationConstraint> locationConstraints = new ArrayList<LocationConstraint>();
Map<String, INodeController> registry = null;
try {
@@ -223,8 +224,8 @@
}
}
- PartitionConstraint partitionConstraint = new PartitionConstraint(locationConstraints
- .toArray(new LocationConstraint[] {}));
+ PartitionConstraint partitionConstraint = new ExplicitPartitionConstraint(locationConstraints
+ .toArray(new LocationConstraint[] {}));
this.setPartitionConstraint(partitionConstraint);
}
diff --git a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
index 3813143..8b7bcff 100644
--- a/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
+++ b/hyracks-core/src/main/resources/edu/uci/ics/hyracks/controller/clustercontroller/scheduler.olg
@@ -112,20 +112,76 @@
watch(stagestart, a);
watch(stagestart, d);
-define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, String});
+define(operatorlocationcandidates, keys(), {UUID, OperatorDescriptorId, String, Integer, Integer});
operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit) :-
- operatorlocation(JobId, OperatorId, NodeId, Partition),
- availablenodes(NodeId)
+ operatorlocation(JobId, OperatorId, NodeId, Partition, Benefit),
+ availablenodes(NodeId);
+
+watch(availablenodes, a);
+watch(availablenodes, i);
+watch(availablenodes, d);
+
+/*
+
+define(rankedavailablenodes_temp, keys(), {String, Integer});
+
+rankedavailablenodes_temp(NodeId, 0) :-
+ availablenodes#insert(NodeId);
+
+rankedavailablenodes_temp(NodeId2, NewRank) :-
+ rankedavailablenodes_temp#insert(NodeId1, Rank),
+ rankedavailablenodes_temp(NodeId2, Rank),
+ NodeId1 < NodeId2
{
- Benefit := NodeId;
+ NewRank := Rank + 1;
};
+rankedavailablenodes_temp(NodeId2, NewRank) :-
+ rankedavailablenodes_temp(NodeId1, Rank),
+ rankedavailablenodes_temp#insert(NodeId2, Rank),
+ NodeId1 < NodeId2
+ {
+ NewRank := Rank + 1;
+ };
+
+rankedavailablenodes_temp(NodeId, Rank) :-
+ availablenodes(NodeId),
+ rankedavailablenodes_temp(NodeId, Rank);
+
+rankedavailablenodes_temp(NodeId, NewRank) :-
+ rankedavailablenodes_temp(NodeId, Rank),
+ Rank != 0,
+ notin rankedavailablenodes_temp(_, Rank - 1)
+ {
+ NewRank := Rank - 1;
+ };
+
+define(rankedavailablenodes, keys(0), {String, Integer});
+
+rankedavailablenodes(NodeId, max<Rank>) :-
+ rankedavailablenodes_temp(NodeId, Rank);
+
+*/
+
+define(availablenodecount, keys(0), {Integer, Integer});
+
+watch(availablenodecount, a);
+watch(availablenodecount, i);
+watch(availablenodecount, d);
+
+availablenodecount(0, count<NodeId>) :-
+ availablenodes(NodeId);
+
+watch(rankedavailablenodes, a);
+watch(rankedavailablenodes, i);
+watch(rankedavailablenodes, d);
+
watch(operatorlocationcandidates, a);
watch(operatorlocationcandidates, i);
watch(operatorlocationcandidates, d);
-define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, String});
+define(maxoperatorlocationbenefit, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
maxoperatorlocationbenefit(JobId, OperatorId, Partition, max<Benefit>) :-
operatorlocationcandidates(JobId, OperatorId, _, Partition, Benefit);
@@ -134,16 +190,81 @@
watch(maxoperatorlocationbenefit, i);
watch(maxoperatorlocationbenefit, d);
-define(operatorlocationdecision, keys(0, 1, 3), {UUID, OperatorDescriptorId, String, Integer});
+define(attemptoperatorlocationdecision, keys(0, 1, 3, 4), {UUID, OperatorDescriptorId, String, Integer, Integer});
-watch(operatorlocationdecision, a);
-watch(operatorlocationdecision, i);
-watch(operatorlocationdecision, d);
+watch(attemptoperatorlocationdecision, a);
+watch(attemptoperatorlocationdecision, i);
+watch(attemptoperatorlocationdecision, d);
-operatorlocationdecision(JobId, OperatorId, NodeId, Partition) :-
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+ jobattempt#insert(JobId, Attempt),
operatorlocationcandidates(JobId, OperatorId, NodeId, Partition, Benefit),
maxoperatorlocationbenefit(JobId, OperatorId, Partition, Benefit);
+attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt) :-
+ jobattempt#insert(JobId, Attempt),
+ operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, CloneRank),
+ rankedavailablenodes(NodeId, NodeRank),
+ availablenodecount(_, NodeCount),
+ NodeRank == CloneRank % NodeCount;
+
+define(operatorclonecount_temp, keys(), {UUID, OperatorDescriptorId, Integer, Integer});
+
+operatorclonecount_temp(JobId, OperatorId, NPartitions, 0) :-
+ operatorclonecount(JobId, OperatorId, NPartitions);
+
+/*
+define(operatorclonecountexpansion, keys(0, 1), {UUID, OperatorDescriptorId, Integer});
+
+operatorclonecountexpansion(JobId, OperatorId, 0) :-
+ operatorclonecount(JobId, OperatorId, _);
+
+operatorclonecountexpansion(JobId, OperatorId, Partition + 1) :-
+ operatorclonecountexpansion#insert(JobId, OperatorId, Partition),
+ operatorclonecount(JobId, OperatorId, NPartitions),
+ Partition < NPartitions - 1;
+*/
+
+define(operatorclonecountexpansiontotalorder, keys(0, 1, 2), {UUID, OperatorDescriptorId, Integer, Integer});
+
+/*
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, 0) :-
+ operatorclonecountexpansion#insert(JobId, OperatorId, Partition);
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, NewRank) :-
+ operatorclonecountexpansiontotalorder#insert(JobId, OperatorId1, Partition1, Rank),
+ operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, Rank),
+ OperatorId1.hashCode() < OperatorId2.hashCode() || (OperatorId1.hashCode() == OperatorId2.hashCode() && Partition1 < Partition2)
+ {
+ NewRank := Rank + 1;
+ };
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId2, Partition2, NewRank) :-
+ operatorclonecountexpansiontotalorder(JobId, OperatorId1, Partition1, Rank),
+ operatorclonecountexpansiontotalorder#insert(JobId, OperatorId2, Partition2, Rank),
+ OperatorId1.hashCode() < OperatorId2.hashCode() || (OperatorId1.hashCode() == OperatorId2.hashCode() && Partition1 < Partition2)
+ {
+ NewRank := Rank + 1;
+ };
+*/
+
+operatorclonecountexpansiontotalorder(JobId, OperatorId, Partition, Rank) :-
+ expandpartitioncountconstraint(operatorclonecount_temp(JobId, OperatorId, Partition, Rank));
+
+watch(operatorclonecountexpansiontotalorder, a);
+watch(operatorclonecountexpansiontotalorder, i);
+watch(operatorclonecountexpansiontotalorder, d);
+
+watch(operatorclonecount, a);
+watch(operatorclonecount, i);
+watch(operatorclonecount, d);
+
+/*
+watch(operatorclonecountexpansion, a);
+watch(operatorclonecountexpansion, i);
+watch(operatorclonecountexpansion, d);
+*/
+
define(activitystart, keys(), {UUID, OperatorDescriptorId, ActivityNodeId, Integer, Integer, UUID, String, Integer});
activitystart(JobId, OperatorId, ActivityId, StageNumber, Attempt, StageId, NodeId, Partition) :-
@@ -151,7 +272,7 @@
operatordescriptor(JobId, OperatorId, _, _),
activitystage(JobId, OperatorId, ActivityId, StageNumber),
jobstage(JobId, StageNumber, StageId),
- operatorlocationdecision(JobId, OperatorId, NodeId, Partition);
+ attemptoperatorlocationdecision(JobId, OperatorId, NodeId, Partition, Attempt);
watch(activitystart, a);
@@ -192,7 +313,7 @@
stageletabort(JobId, StageId, JobPlan, NodeIdOther, Attempt, ActivityIdSet) :-
stageletstart(JobId, StageId, JobPlan, NodeId, Attempt, _),
stageletstart(JobId, StageId, _, NodeIdOther, Attempt, ActivityIdSet),
- failednodes(NodeId),
+ failednodes#insert(NodeId),
notin stageletcomplete(JobId, StageId, NodeId, Attempt, _);
watch(stageletabort, a);
@@ -202,19 +323,20 @@
define(stageabort, keys(0, 1, 2), {UUID, UUID, Integer, Set});
stageabort(JobId, StageId, Attempt, set<NodeId>) :-
- stageletabort(JobId, StageId, _, NodeId, Attempt, _);
+ stageletabort#insert(JobId, StageId, _, NodeId, Attempt, _);
define(abortmessage_agg, keys(0, 1, 2), {UUID, UUID, Integer, JobPlan, Set});
abortmessage_agg(JobId, StageId, Attempt, JobPlan, set<Tuple>) :-
- stageletabort(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
+ stageletabort#insert(JobId, StageId, JobPlan, NodeId, Attempt, ActivityIdSet),
availablenodes(NodeId)
{
Tuple := [NodeId, ActivityIdSet];
};
abortmessage(JobId, StageId, Attempt, JobPlan, TSet) :-
- abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet);
+ abortmessage_agg(JobId, StageId, Attempt, JobPlan, TSet),
+ TSet.size() != 0;
watch(abortmessage, a);
watch(abortmessage, i);
@@ -226,7 +348,7 @@
stageletabortcomplete(JobId, StageId, NodeId, Attempt) :-
stageletabort(JobId, StageId, _, NodeId, Attempt, _),
- failednodes(NodeId);
+ notin availablenodes(NodeId);
define(stageletabortcomplete_agg, keys(0, 1, 2), {UUID, UUID, Integer, Set});
@@ -262,7 +384,7 @@
jobcleanup_agg(JobId, set<NodeId>) :-
stagestart#insert(JobId, StageNumber, Attempt),
stagefinish(JobId, _, Attempt, _),
- operatorlocationdecision(JobId, _, NodeId, Attempt),
+ attemptoperatorlocationdecision(JobId, _, NodeId, _, Attempt),
notin jobstage(JobId, StageNumber);
jobcleanup(JobId, NodeIdSet) :-
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
index a3beb11..b7c2e57 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/CountOfCountsTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -49,57 +50,85 @@
public void countOfCountsSingleNC() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+ FileSplit[] splits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/words.txt"))
+ };
+ RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
- InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
- PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc);
+ PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
- PartitionConstraint groupPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+ });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(0), desc2);
+ PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
group.setPartitionConstraint(groupPartitionConstraint);
- InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
- PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc2);
+ PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
- PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
- PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(1), desc2);
+ PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
group2.setPartitionConstraint(groupPartitionConstraint2);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn1, csvScanner, 0, sorter, 0);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, sorter, 0, group, 0);
IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn3, group, 0, sorter2, 0);
IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -116,59 +145,91 @@
public void countOfCountsMultiNC() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+ FileSplit[] splits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/words.txt"))
+ };
+ RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
- InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
- PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc);
+ PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
- PartitionConstraint groupPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+ });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(0), desc2);
+ PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID)
+ });
group.setPartitionConstraint(groupPartitionConstraint);
- InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
- PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemorySortOperatorDescriptor sorter2 = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc2);
+ PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
- PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
- PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(1), desc2);
+ PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
group2.setPartitionConstraint(groupPartitionConstraint2);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn1, csvScanner, 0, sorter, 0);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, sorter, 0, group, 0);
IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn3, group, 0, sorter2, 0);
IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
@@ -185,59 +246,91 @@
public void countOfCountsExternalSortMultiNC() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/words.txt")) };
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+ FileSplit[] splits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/words.txt"))
+ };
+ RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
- ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 0 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc);
- PartitionConstraint sorterPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
+ 0
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc);
+ PartitionConstraint sorterPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter.setPartitionConstraint(sorterPartitionConstraint);
RecordDescriptor desc2 = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
- PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 0 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(0), desc2);
- PartitionConstraint groupPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID),
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE
+ });
+ PreclusteredGroupOperatorDescriptor group = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 0
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(0), desc2);
+ PartitionConstraint groupPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID),
+ new AbsoluteLocationConstraint(NC1_ID),
+ new AbsoluteLocationConstraint(NC2_ID)
+ });
group.setPartitionConstraint(groupPartitionConstraint);
- ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] { 1 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, desc2);
- PartitionConstraint sorterPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ ExternalSortOperatorDescriptor sorter2 = new ExternalSortOperatorDescriptor(spec, 3, new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, desc2);
+ PartitionConstraint sorterPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter2.setPartitionConstraint(sorterPartitionConstraint2);
- PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] { 1 },
- new IComparatorFactory[] { StringComparatorFactory.INSTANCE }, new SumStringGroupAggregator(1), desc2);
- PartitionConstraint groupPartitionConstraint2 = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ PreclusteredGroupOperatorDescriptor group2 = new PreclusteredGroupOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IComparatorFactory[] {
+ StringComparatorFactory.INSTANCE
+ }, new SumStringGroupAggregator(1), desc2);
+ PartitionConstraint groupPartitionConstraint2 = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
group2.setPartitionConstraint(groupPartitionConstraint2);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn1 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn1, csvScanner, 0, sorter, 0);
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, sorter, 0, group, 0);
IConnectorDescriptor conn3 = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(conn3, group, 0, sorter2, 0);
IConnectorDescriptor conn4 = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
index d619bba..24d45fa 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/ScanPrintTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -36,19 +37,23 @@
public void scanPrint01() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] splits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/words.txt")),
- new FileSplit(NC1_ID, new File("data/words.txt")) };
- RecordDescriptor desc = new RecordDescriptor(
- new ISerializerDeserializer[] { StringSerializerDeserializer.INSTANCE });
+ FileSplit[] splits = new FileSplit[] {
+ new FileSplit(NC2_ID, new File("data/words.txt")), new FileSplit(NC1_ID, new File("data/words.txt"))
+ };
+ RecordDescriptor desc = new RecordDescriptor(new ISerializerDeserializer[] {
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor csvScanner = new CSVFileScanOperatorDescriptor(spec, splits, desc);
- PartitionConstraint csvPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint csvPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
+ });
csvScanner.setPartitionConstraint(csvPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC2_ID), new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor conn = new OneToOneConnectorDescriptor(spec);
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
index cd254e2..0b19cea 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/SortMergeTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -43,39 +44,57 @@
JobSpecification spec = new JobSpecification();
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+ };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
- PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
- InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] { 1 },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, ordersDesc);
- PartitionConstraint sortersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemorySortOperatorDescriptor sorter = new InMemorySortOperatorDescriptor(spec, new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, ordersDesc);
+ PartitionConstraint sortersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
sorter.setPartitionConstraint(sortersPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
spec.connect(new MToNHashPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- new int[] { 1 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }),
- new int[] { 1 }, new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }), sorter, 0,
- printer, 0);
-
+ new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }), new int[] {
+ 1
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }), sorter, 0, printer, 0);
+
runTest(spec);
}
}
\ No newline at end of file
diff --git a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 593eac1..7494f11 100644
--- a/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-core/src/test/java/edu/uci/ics/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -19,6 +19,7 @@
import org.junit.Test;
import edu.uci.ics.hyracks.api.constraints.AbsoluteLocationConstraint;
+import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.LocationConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -42,83 +43,94 @@
public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
/*
- * TPCH Customer table:
- * CREATE TABLE CUSTOMER (
- * C_CUSTKEY INTEGER NOT NULL,
- * C_NAME VARCHAR(25) NOT NULL,
- * C_ADDRESS VARCHAR(40) NOT NULL,
- * C_NATIONKEY INTEGER NOT NULL,
- * C_PHONE CHAR(15) NOT NULL,
- * C_ACCTBAL DECIMAL(15,2) NOT NULL,
- * C_MKTSEGMENT CHAR(10) NOT NULL,
- * C_COMMENT VARCHAR(117) NOT NULL
- * );
- * TPCH Orders table:
- * CREATE TABLE ORDERS (
- * O_ORDERKEY INTEGER NOT NULL,
- * O_CUSTKEY INTEGER NOT NULL,
- * O_ORDERSTATUS CHAR(1) NOT NULL,
- * O_TOTALPRICE DECIMAL(15,2) NOT NULL,
- * O_ORDERDATE DATE NOT NULL,
- * O_ORDERPRIORITY CHAR(15) NOT NULL,
- * O_CLERK CHAR(15) NOT NULL,
- * O_SHIPPRIORITY INTEGER NOT NULL,
- * O_COMMENT VARCHAR(79) NOT NULL
- * );
+ * TPCH Customer table: CREATE TABLE CUSTOMER ( C_CUSTKEY INTEGER NOT NULL, C_NAME VARCHAR(25) NOT NULL, C_ADDRESS VARCHAR(40) NOT NULL, C_NATIONKEY INTEGER NOT NULL, C_PHONE CHAR(15) NOT NULL, C_ACCTBAL DECIMAL(15,2) NOT NULL, C_MKTSEGMENT CHAR(10) NOT NULL, C_COMMENT VARCHAR(117) NOT NULL ); TPCH Orders table: CREATE TABLE ORDERS ( O_ORDERKEY INTEGER NOT NULL, O_CUSTKEY INTEGER NOT NULL, O_ORDERSTATUS CHAR(1) NOT NULL, O_TOTALPRICE DECIMAL(15,2) NOT NULL, O_ORDERDATE DATE NOT NULL, O_ORDERPRIORITY CHAR(15) NOT NULL, O_CLERK CHAR(15) NOT NULL, O_SHIPPRIORITY INTEGER NOT NULL, O_COMMENT VARCHAR(79) NOT NULL );
*/
@Test
public void customerOrderCIDJoin() throws Exception {
JobSpecification spec = new JobSpecification();
- FileSplit[] custSplits = new FileSplit[] { new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl")) };
+ FileSplit[] custSplits = new FileSplit[] {
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer.tbl"))
+ };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
- FileSplit[] ordersSplits = new FileSplit[] { new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl")) };
+ FileSplit[] ordersSplits = new FileSplit[] {
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders.tbl"))
+ };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
- PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
- PartitionConstraint custPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
custScanner.setPartitionConstraint(custPartitionConstraint);
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
- PartitionConstraint joinPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+ 1
+ }, new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, custOrderJoinDesc, 128);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
join.setPartitionConstraint(joinPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);
@@ -139,67 +151,104 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+ };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+ };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
- PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
- PartitionConstraint custPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
custScanner.setPartitionConstraint(custPartitionConstraint);
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
- PartitionConstraint joinPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+ 1
+ }, new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, custOrderJoinDesc, 128);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
join.setPartitionConstraint(joinPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(ordJoinConn, ordScanner, 0, join, 0);
IConnectorDescriptor custJoinConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(custJoinConn, custScanner, 0, join, 1);
IConnectorDescriptor joinPrinterConn = new MToNReplicatingConnectorDescriptor(spec);
@@ -214,75 +263,114 @@
JobSpecification spec = new JobSpecification();
FileSplit[] custSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/customer-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/customer-part2.tbl"))
+ };
RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
FileSplit[] ordersSplits = new FileSplit[] {
- new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
- new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl")) };
+ new FileSplit(NC1_ID, new File("data/tpch0.001/orders-part1.tbl")),
+ new FileSplit(NC2_ID, new File("data/tpch0.001/orders-part2.tbl"))
+ };
RecordDescriptor ordersDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
RecordDescriptor custOrderJoinDesc = new RecordDescriptor(new ISerializerDeserializer[] {
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE, StringSerializerDeserializer.INSTANCE,
- StringSerializerDeserializer.INSTANCE });
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE,
+ StringSerializerDeserializer.INSTANCE
+ });
CSVFileScanOperatorDescriptor ordScanner = new CSVFileScanOperatorDescriptor(spec, ordersSplits, ordersDesc,
- '|', "'\"");
- PartitionConstraint ordersPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ '|', "'\"");
+ PartitionConstraint ordersPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
ordScanner.setPartitionConstraint(ordersPartitionConstraint);
CSVFileScanOperatorDescriptor custScanner = new CSVFileScanOperatorDescriptor(spec, custSplits, custDesc, '|',
- "'\"");
- PartitionConstraint custPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ "'\"");
+ PartitionConstraint custPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
custScanner.setPartitionConstraint(custPartitionConstraint);
MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc);
- ordMat.setPartitionConstraint(new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
+ ordMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ }));
MaterializingOperatorDescriptor custMat = new MaterializingOperatorDescriptor(spec, custDesc);
- custMat.setPartitionConstraint(new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) }));
+ custMat.setPartitionConstraint(new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ }));
- InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
- new int[] { 0 }, new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE },
- new IBinaryComparatorFactory[] { StringBinaryComparatorFactory.INSTANCE }, custOrderJoinDesc, 128);
- PartitionConstraint joinPartitionConstraint = new PartitionConstraint(new LocationConstraint[] {
- new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID) });
+ InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] {
+ 1
+ }, new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }, new IBinaryComparatorFactory[] {
+ StringBinaryComparatorFactory.INSTANCE
+ }, custOrderJoinDesc, 128);
+ PartitionConstraint joinPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID), new AbsoluteLocationConstraint(NC2_ID)
+ });
join.setPartitionConstraint(joinPartitionConstraint);
PrinterOperatorDescriptor printer = new PrinterOperatorDescriptor(spec);
- PartitionConstraint printerPartitionConstraint = new PartitionConstraint(
- new LocationConstraint[] { new AbsoluteLocationConstraint(NC1_ID) });
+ PartitionConstraint printerPartitionConstraint = new ExplicitPartitionConstraint(new LocationConstraint[] {
+ new AbsoluteLocationConstraint(NC1_ID)
+ });
printer.setPartitionConstraint(printerPartitionConstraint);
IConnectorDescriptor ordPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 1 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 1
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(ordPartConn, ordScanner, 0, ordMat, 0);
IConnectorDescriptor custPartConn = new MToNHashPartitioningConnectorDescriptor(spec,
- new FieldHashPartitionComputerFactory(new int[] { 0 },
- new IBinaryHashFunctionFactory[] { StringBinaryHashFunctionFactory.INSTANCE }));
+ new FieldHashPartitionComputerFactory(new int[] {
+ 0
+ }, new IBinaryHashFunctionFactory[] {
+ StringBinaryHashFunctionFactory.INSTANCE
+ }));
spec.connect(custPartConn, custScanner, 0, custMat, 0);
IConnectorDescriptor ordJoinConn = new OneToOneConnectorDescriptor(spec);