Added more connector policies. Cleaned up Activity API.
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@431 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityNodeId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
similarity index 73%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityNodeId.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
index 12f76fe..5a58b7b 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityNodeId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/ActivityId.java
@@ -15,14 +15,13 @@
package edu.uci.ics.hyracks.api.dataflow;
import java.io.Serializable;
-import java.util.UUID;
-public final class ActivityNodeId implements Serializable {
+public final class ActivityId implements Serializable {
private static final long serialVersionUID = 1L;
private final OperatorDescriptorId odId;
- private final UUID id;
+ private final long id;
- public ActivityNodeId(OperatorDescriptorId odId, UUID id) {
+ public ActivityId(OperatorDescriptorId odId, long id) {
this.odId = odId;
this.id = id;
}
@@ -31,13 +30,13 @@
return odId;
}
- public UUID getLocalId() {
+ public long getLocalId() {
return id;
}
@Override
public int hashCode() {
- return odId.hashCode() + id.hashCode();
+ return (int) (odId.hashCode() + id);
}
@Override
@@ -45,14 +44,14 @@
if (this == o) {
return true;
}
- if (!(o instanceof ActivityNodeId)) {
+ if (!(o instanceof ActivityId)) {
return false;
}
- ActivityNodeId other = (ActivityNodeId) o;
- return other.odId.equals(odId) && other.id.equals(id);
+ ActivityId other = (ActivityId) o;
+ return other.odId.equals(odId) && other.id == id;
}
public String toString() {
- return "ANID:" + id;
+ return "ANID:[" + odId + ":" + id + "]";
}
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
similarity index 88%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
index 9a37661..6b519d6 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityNode.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivity.java
@@ -21,10 +21,8 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IOperatorEnvironment;
-public interface IActivityNode extends Serializable {
- public ActivityNodeId getActivityNodeId();
-
- public IOperatorDescriptor getOwner();
+public interface IActivity extends Serializable {
+ public ActivityId getActivityId();
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException;
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
index c6bc5e3..56870b2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
@@ -15,11 +15,11 @@
package edu.uci.ics.hyracks.api.dataflow;
public interface IActivityGraphBuilder {
- public void addTask(IActivityNode task);
+ public void addActivity(IActivity task);
- public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked);
+ public void addBlockingEdge(IActivity blocker, IActivity blocked);
- public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex);
+ public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex);
- public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex);
+ public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex);
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
index b838a80..3731459 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -63,7 +63,7 @@
* @param builder
* - graph builder
*/
- public void contributeTaskGraph(IActivityGraphBuilder builder);
+ public void contributeActivities(IActivityGraphBuilder builder);
/**
* Contributes any scheduling constraints imposed by this operator.
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
deleted file mode 100644
index 844266d..0000000
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/PortInstanceId.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataflow;
-
-import java.io.Serializable;
-
-public final class PortInstanceId implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private OperatorDescriptorId odId;
- private Direction direction;
- private int portIndex;
- private int partition;
-
- public PortInstanceId(OperatorDescriptorId odId, Direction direction, int portIndex, int partition) {
- this.odId = odId;
- this.direction = direction;
- this.portIndex = portIndex;
- this.partition = partition;
- }
-
- public OperatorDescriptorId getOperatorId() {
- return odId;
- }
-
- public Direction getDirection() {
- return direction;
- }
-
- public int getPortIndex() {
- return portIndex;
- }
-
- public int getPartition() {
- return partition;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((direction == null) ? 0 : direction.hashCode());
- result = prime * result + ((odId == null) ? 0 : odId.hashCode());
- result = prime * result + partition;
- result = prime * result + portIndex;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- PortInstanceId other = (PortInstanceId) obj;
- if (direction == null) {
- if (other.direction != null)
- return false;
- } else if (!direction.equals(other.direction))
- return false;
- if (odId == null) {
- if (other.odId != null)
- return false;
- } else if (!odId.equals(other.odId))
- return false;
- if (partition != other.partition)
- return false;
- if (portIndex != other.portIndex)
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return odId + ":" + direction + ":" + partition + ":" + portIndex;
- }
-}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
index 9283b8e..4420279 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/TaskId.java
@@ -19,16 +19,16 @@
public final class TaskId implements Serializable {
private static final long serialVersionUID = 1L;
- private final ActivityNodeId activityId;
+ private final ActivityId activityId;
private final int partition;
- public TaskId(ActivityNodeId activityId, int partition) {
+ public TaskId(ActivityId activityId, int partition) {
this.activityId = activityId;
this.partition = partition;
}
- public ActivityNodeId getActivityId() {
+ public ActivityId getActivityId() {
return activityId;
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
similarity index 84%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
index da659ce..d6d6fee 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicy.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
import java.io.Serializable;
@@ -20,4 +20,8 @@
public boolean requiresProducerConsumerCoscheduling();
public boolean consumerWaitsForProducerToFinish();
+
+ public boolean materializeOnSendSide();
+
+ public boolean materializeOnReceiveSide();
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
similarity index 66%
copy from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
index da659ce..ca684e4 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/IConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/IConnectorPolicyAssignmentPolicy.java
@@ -12,12 +12,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
import java.io.Serializable;
-public interface IConnectorPolicy extends Serializable {
- public boolean requiresProducerConsumerCoscheduling();
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
- public boolean consumerWaitsForProducerToFinish();
+public interface IConnectorPolicyAssignmentPolicy extends Serializable {
+ public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+ int[] fanouts);
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/PipelinedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
similarity index 80%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/PipelinedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
index bc98e93..ba9d3a5 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/PipelinedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/PipelinedConnectorPolicy.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
public final class PipelinedConnectorPolicy implements IConnectorPolicy {
private static final long serialVersionUID = 1L;
@@ -26,4 +26,14 @@
public boolean consumerWaitsForProducerToFinish() {
return true;
}
+
+ @Override
+ public boolean materializeOnSendSide() {
+ return false;
+ }
+
+ @Override
+ public boolean materializeOnReceiveSide() {
+ return false;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
similarity index 80%
rename from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
index 235cd3c..ffd3722 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedConnectorPolicy.java
@@ -12,7 +12,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
public final class SendSideMaterializedConnectorPolicy implements IConnectorPolicy {
private static final long serialVersionUID = 1L;
@@ -26,4 +26,14 @@
public boolean consumerWaitsForProducerToFinish() {
return true;
}
+
+ @Override
+ public boolean materializeOnSendSide() {
+ return true;
+ }
+
+ @Override
+ public boolean materializeOnReceiveSide() {
+ return false;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
similarity index 72%
copy from hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
index 235cd3c..0a02e58 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/connectors/SendSideMaterializedReceiveSideMaterializedConnectorPolicy.java
@@ -12,9 +12,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.control.common.job.dataflow;
+package edu.uci.ics.hyracks.api.dataflow.connectors;
-public final class SendSideMaterializedConnectorPolicy implements IConnectorPolicy {
+public final class SendSideMaterializedReceiveSideMaterializedConnectorPolicy implements IConnectorPolicy {
private static final long serialVersionUID = 1L;
@Override
@@ -26,4 +26,14 @@
public boolean consumerWaitsForProducerToFinish() {
return true;
}
+
+ @Override
+ public boolean materializeOnSendSide() {
+ return true;
+ }
+
+ @Override
+ public boolean materializeOnReceiveSide() {
+ return true;
+ }
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index d2a4f0f..f369f95 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -22,9 +22,9 @@
import java.util.Map;
import java.util.Set;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
@@ -40,34 +40,34 @@
private final EnumSet<JobFlag> jobFlags;
- private final Map<ActivityNodeId, IActivityNode> activityNodes;
+ private final Map<ActivityId, IActivity> activityNodes;
- private final Map<ActivityNodeId, Set<ActivityNodeId>> blocker2blockedMap;
+ private final Map<ActivityId, Set<ActivityId>> blocker2blockedMap;
- private final Map<ActivityNodeId, Set<ActivityNodeId>> blocked2blockerMap;
+ private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
- private final Map<OperatorDescriptorId, Set<ActivityNodeId>> operatorActivityMap;
+ private final Map<OperatorDescriptorId, Set<ActivityId>> operatorActivityMap;
- private final Map<ActivityNodeId, List<Integer>> activityInputMap;
+ private final Map<ActivityId, List<Integer>> activityInputMap;
- private final Map<ActivityNodeId, List<Integer>> activityOutputMap;
+ private final Map<ActivityId, List<Integer>> activityOutputMap;
- private final Map<OperatorDescriptorId, List<ActivityNodeId>> operatorInputMap;
+ private final Map<OperatorDescriptorId, List<ActivityId>> operatorInputMap;
- private final Map<OperatorDescriptorId, List<ActivityNodeId>> operatorOutputMap;
+ private final Map<OperatorDescriptorId, List<ActivityId>> operatorOutputMap;
public JobActivityGraph(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
this.appName = appName;
this.jobSpec = jobSpec;
this.jobFlags = jobFlags;
- activityNodes = new HashMap<ActivityNodeId, IActivityNode>();
- blocker2blockedMap = new HashMap<ActivityNodeId, Set<ActivityNodeId>>();
- blocked2blockerMap = new HashMap<ActivityNodeId, Set<ActivityNodeId>>();
- operatorActivityMap = new HashMap<OperatorDescriptorId, Set<ActivityNodeId>>();
- activityInputMap = new HashMap<ActivityNodeId, List<Integer>>();
- activityOutputMap = new HashMap<ActivityNodeId, List<Integer>>();
- operatorInputMap = new HashMap<OperatorDescriptorId, List<ActivityNodeId>>();
- operatorOutputMap = new HashMap<OperatorDescriptorId, List<ActivityNodeId>>();
+ activityNodes = new HashMap<ActivityId, IActivity>();
+ blocker2blockedMap = new HashMap<ActivityId, Set<ActivityId>>();
+ blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+ operatorActivityMap = new HashMap<OperatorDescriptorId, Set<ActivityId>>();
+ activityInputMap = new HashMap<ActivityId, List<Integer>>();
+ activityOutputMap = new HashMap<ActivityId, List<Integer>>();
+ operatorInputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
+ operatorOutputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
}
public String getApplicationName() {
@@ -82,39 +82,39 @@
return jobFlags;
}
- public Map<ActivityNodeId, IActivityNode> getActivityNodeMap() {
+ public Map<ActivityId, IActivity> getActivityNodeMap() {
return activityNodes;
}
- public Map<ActivityNodeId, Set<ActivityNodeId>> getBlocker2BlockedMap() {
+ public Map<ActivityId, Set<ActivityId>> getBlocker2BlockedMap() {
return blocker2blockedMap;
}
- public Map<ActivityNodeId, Set<ActivityNodeId>> getBlocked2BlockerMap() {
+ public Map<ActivityId, Set<ActivityId>> getBlocked2BlockerMap() {
return blocked2blockerMap;
}
- public Map<OperatorDescriptorId, Set<ActivityNodeId>> getOperatorActivityMap() {
+ public Map<OperatorDescriptorId, Set<ActivityId>> getOperatorActivityMap() {
return operatorActivityMap;
}
- public Map<ActivityNodeId, List<Integer>> getActivityInputMap() {
+ public Map<ActivityId, List<Integer>> getActivityInputMap() {
return activityInputMap;
}
- public Map<ActivityNodeId, List<Integer>> getActivityOutputMap() {
+ public Map<ActivityId, List<Integer>> getActivityOutputMap() {
return activityOutputMap;
}
- public Map<OperatorDescriptorId, List<ActivityNodeId>> getOperatorInputMap() {
+ public Map<OperatorDescriptorId, List<ActivityId>> getOperatorInputMap() {
return operatorInputMap;
}
- public Map<OperatorDescriptorId, List<ActivityNodeId>> getOperatorOutputMap() {
+ public Map<OperatorDescriptorId, List<ActivityId>> getOperatorOutputMap() {
return operatorOutputMap;
}
- public List<IConnectorDescriptor> getActivityInputConnectorDescriptors(ActivityNodeId hanId) {
+ public List<IConnectorDescriptor> getActivityInputConnectorDescriptors(ActivityId hanId) {
List<Integer> inputIndexes = activityInputMap.get(hanId);
if (inputIndexes == null) {
return null;
@@ -127,7 +127,7 @@
return inputs;
}
- public List<IConnectorDescriptor> getActivityOutputConnectorDescriptors(ActivityNodeId hanId) {
+ public List<IConnectorDescriptor> getActivityOutputConnectorDescriptors(ActivityId hanId) {
List<Integer> outputIndexes = activityOutputMap.get(hanId);
if (outputIndexes == null) {
return null;
@@ -140,14 +140,14 @@
return outputs;
}
- public ActivityNodeId getConsumerActivity(ConnectorDescriptorId cdId) {
+ public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
.getConnectorOperatorMap().get(cdId);
OperatorDescriptorId consumerOpId = connEdge.second.first.getOperatorId();
int consumerInputIdx = connEdge.second.second;
- for (ActivityNodeId anId : operatorActivityMap.get(consumerOpId)) {
+ for (ActivityId anId : operatorActivityMap.get(consumerOpId)) {
List<Integer> anInputs = activityInputMap.get(anId);
if (anInputs != null) {
for (Integer idx : anInputs) {
@@ -160,14 +160,14 @@
return null;
}
- public ActivityNodeId getProducerActivity(ConnectorDescriptorId cdId) {
+ public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
.getConnectorOperatorMap().get(cdId);
OperatorDescriptorId producerOpId = connEdge.first.first.getOperatorId();
int producerInputIdx = connEdge.first.second;
- for (ActivityNodeId anId : operatorActivityMap.get(producerOpId)) {
+ for (ActivityId anId : operatorActivityMap.get(producerOpId)) {
List<Integer> anOutputs = activityOutputMap.get(anId);
if (anOutputs != null) {
for (Integer idx : anOutputs) {
@@ -180,12 +180,12 @@
return null;
}
- public RecordDescriptor getActivityInputRecordDescriptor(ActivityNodeId hanId, int inputIndex) {
+ public RecordDescriptor getActivityInputRecordDescriptor(ActivityId hanId, int inputIndex) {
int opInputIndex = getActivityInputMap().get(hanId).get(inputIndex);
return jobSpec.getOperatorInputRecordDescriptor(hanId.getOperatorDescriptorId(), opInputIndex);
}
- public RecordDescriptor getActivityOutputRecordDescriptor(ActivityNodeId hanId, int outputIndex) {
+ public RecordDescriptor getActivityOutputRecordDescriptor(ActivityId hanId, int outputIndex) {
int opOutputIndex = getActivityOutputMap().get(hanId).get(outputIndex);
return jobSpec.getOperatorOutputRecordDescriptor(hanId.getOperatorDescriptorId(), opOutputIndex);
}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index ab5397d..79d10ab 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.util.Pair;
@@ -53,6 +54,8 @@
private final Set<Constraint> userConstraints;
+ private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+
private int maxAttempts;
public JobSpecification() {
@@ -173,6 +176,14 @@
return roots;
}
+ public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+ return connectorPolicyAssignmentPolicy;
+ }
+
+ public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+ this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+ }
+
public void setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ISliver.java
similarity index 63%
copy from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ISliver.java
index c884998..135a77c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ISliver.java
@@ -12,9 +12,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.api.dataflow;
+package edu.uci.ics.hyracks.api.things;
-public enum Direction {
- INPUT,
- OUTPUT,
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public interface ISliver {
+ public void open() throws HyracksDataException;
+
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
+
+ public void commit() throws HyracksDataException;
+
+ public void abort();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingDescriptor.java
similarity index 78%
copy from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
copy to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingDescriptor.java
index c884998..1269e35 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingDescriptor.java
@@ -12,9 +12,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.api.dataflow;
+package edu.uci.ics.hyracks.api.things;
-public enum Direction {
- INPUT,
- OUTPUT,
+import java.io.Serializable;
+
+public interface IThingDescriptor extends Serializable {
+ public ThingDescriptorId getThingId();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingPartition.java
similarity index 84%
rename from hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
rename to hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingPartition.java
index c884998..cb79b6f 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/Direction.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/IThingPartition.java
@@ -12,9 +12,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package edu.uci.ics.hyracks.api.dataflow;
+package edu.uci.ics.hyracks.api.things;
-public enum Direction {
- INPUT,
- OUTPUT,
+public interface IThingPartition {
+ public ISliver createSliver();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
new file mode 100644
index 0000000..9adf482
--- /dev/null
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/things/ThingDescriptorId.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.things;
+
+import java.io.Serializable;
+
+public final class ThingDescriptorId implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final long id;
+
+ public ThingDescriptorId(long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public String toString() {
+ return "TID: " + id;
+ }
+
+ public int hashCode() {
+ return (int) (id & 0xffffffff);
+ }
+
+ public boolean equals(Object o) {
+ if (!(o instanceof ThingDescriptorId)) {
+ return false;
+ }
+ return id == ((ThingDescriptorId) o).id;
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index bf4dde1..c802eb8 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -19,38 +19,42 @@
import java.util.Map;
import java.util.Set;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.cc.scheduler.IActivityClusterStateMachine;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
public class ActivityCluster {
private final JobRun jobRun;
- private final Set<ActivityNodeId> activities;
+ private final Set<ActivityId> activities;
private final Set<ActivityCluster> dependencies;
private final Set<ActivityCluster> dependents;
- private final Map<ActivityNodeId, Task[]> taskStateMap;
+ private final Map<ActivityId, Task[]> taskStateMap;
private TaskCluster[] taskClusters;
+ private Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap;
+
private IActivityClusterStateMachine acsm;
private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
- public ActivityCluster(JobRun jobRun, Set<ActivityNodeId> activities) {
+ public ActivityCluster(JobRun jobRun, Set<ActivityId> activities) {
this.jobRun = jobRun;
this.activities = activities;
dependencies = new HashSet<ActivityCluster>();
dependents = new HashSet<ActivityCluster>();
- taskStateMap = new HashMap<ActivityNodeId, Task[]>();
+ taskStateMap = new HashMap<ActivityId, Task[]>();
+ partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
}
- public Set<ActivityNodeId> getActivities() {
+ public Set<ActivityId> getActivities() {
return activities;
}
@@ -66,7 +70,7 @@
return dependencies;
}
- public Map<ActivityNodeId, Task[]> getTaskMap() {
+ public Map<ActivityId, Task[]> getTaskMap() {
return taskStateMap;
}
@@ -78,6 +82,10 @@
this.taskClusters = taskClusters;
}
+ public Map<PartitionId, TaskCluster> getPartitionProducingTaskClusterMap() {
+ return partitionProducingTaskClusterMap;
+ }
+
public IActivityClusterStateMachine getStateMachine() {
return acsm;
}
@@ -91,7 +99,7 @@
}
public int getMaxTaskClusterAttempts() {
- return 1;
+ return jobRun.getJobActivityGraph().getJobSpecification().getMaxAttempts();
}
public void notifyTaskClusterFailure(TaskClusterAttempt tcAttempt, Exception exception) throws HyracksException {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
index 9f9a2e3..6effd84 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
@@ -9,8 +9,9 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -21,37 +22,38 @@
private JobActivityGraph jag;
@Override
- public void addBlockingEdge(IActivityNode blocker, IActivityNode blocked) {
- addToValueSet(jag.getBlocker2BlockedMap(), blocker.getActivityNodeId(), blocked.getActivityNodeId());
- addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityNodeId(), blocker.getActivityNodeId());
+ public void addBlockingEdge(IActivity blocker, IActivity blocked) {
+ addToValueSet(jag.getBlocker2BlockedMap(), blocker.getActivityId(), blocked.getActivityId());
+ addToValueSet(jag.getBlocked2BlockerMap(), blocked.getActivityId(), blocker.getActivityId());
}
@Override
- public void addSourceEdge(int operatorInputIndex, IActivityNode task, int taskInputIndex) {
+ public void addSourceEdge(int operatorInputIndex, IActivity task, int taskInputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Adding source edge: " + task.getOwner().getOperatorId() + ":" + operatorInputIndex + " -> "
- + task.getActivityNodeId() + ":" + taskInputIndex);
+ LOGGER.finest("Adding source edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
+ + operatorInputIndex + " -> " + task.getActivityId() + ":" + taskInputIndex);
}
- insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityNodeId(), taskInputIndex, operatorInputIndex);
- insertIntoIndexedMap(jag.getOperatorInputMap(), task.getOwner().getOperatorId(), operatorInputIndex,
- task.getActivityNodeId());
+ insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, operatorInputIndex);
+ insertIntoIndexedMap(jag.getOperatorInputMap(), task.getActivityId().getOperatorDescriptorId(),
+ operatorInputIndex, task.getActivityId());
}
@Override
- public void addTargetEdge(int operatorOutputIndex, IActivityNode task, int taskOutputIndex) {
+ public void addTargetEdge(int operatorOutputIndex, IActivity task, int taskOutputIndex) {
if (LOGGER.isLoggable(Level.FINEST)) {
- LOGGER.finest("Adding target edge: " + task.getOwner().getOperatorId() + ":" + operatorOutputIndex + " -> "
- + task.getActivityNodeId() + ":" + taskOutputIndex);
+ LOGGER.finest("Adding target edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
+ + operatorOutputIndex + " -> " + task.getActivityId() + ":" + taskOutputIndex);
}
- insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityNodeId(), taskOutputIndex, operatorOutputIndex);
- insertIntoIndexedMap(jag.getOperatorOutputMap(), task.getOwner().getOperatorId(), operatorOutputIndex,
- task.getActivityNodeId());
+ insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, operatorOutputIndex);
+ insertIntoIndexedMap(jag.getOperatorOutputMap(), task.getActivityId().getOperatorDescriptorId(),
+ operatorOutputIndex, task.getActivityId());
}
@Override
- public void addTask(IActivityNode task) {
- jag.getActivityNodeMap().put(task.getActivityNodeId(), task);
- addToValueSet(jag.getOperatorActivityMap(), task.getOwner().getOperatorId(), task.getActivityNodeId());
+ public void addActivity(IActivity task) {
+ ActivityId activityId = task.getActivityId();
+ jag.getActivityNodeMap().put(activityId, task);
+ addToValueSet(jag.getOperatorActivityMap(), activityId.getOperatorDescriptorId(), activityId);
}
private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
index fb7dbf8..359bf2e 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobRun.java
@@ -21,7 +21,7 @@
import java.util.UUID;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -42,7 +42,7 @@
private final JobProfile profile;
- private final Map<ActivityNodeId, ActivityCluster> activityClusterMap;
+ private final Map<ActivityId, ActivityCluster> activityClusterMap;
private IJobRunStateMachine jsm;
@@ -57,7 +57,7 @@
partitionRequestorMap = new HashMap<PartitionId, String>();
participatingNodeIds = new HashSet<String>();
profile = new JobProfile(jobId);
- activityClusterMap = new HashMap<ActivityNodeId, ActivityCluster>();
+ activityClusterMap = new HashMap<ActivityId, ActivityCluster>();
}
public UUID getJobId() {
@@ -116,7 +116,7 @@
return partitionRequestorMap;
}
- public Map<ActivityNodeId, ActivityCluster> getActivityClusterMap() {
+ public Map<ActivityId, ActivityCluster> getActivityClusterMap() {
return activityClusterMap;
}
}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
index 068ec5f..b82dafd 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/TaskCluster.java
@@ -20,12 +20,17 @@
import java.util.Set;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class TaskCluster {
private final ActivityCluster activityCluster;
private final Task[] tasks;
+ private final Set<PartitionId> producedPartitions;
+
+ private final Set<PartitionId> requiredPartitions;
+
private final Set<TaskCluster> blockers;
private final Set<TaskCluster> dependencies;
@@ -35,6 +40,8 @@
public TaskCluster(ActivityCluster activityCluster, Task[] tasks) {
this.activityCluster = activityCluster;
this.tasks = tasks;
+ this.producedPartitions = new HashSet<PartitionId>();
+ this.requiredPartitions = new HashSet<PartitionId>();
this.blockers = new HashSet<TaskCluster>();
this.dependencies = new HashSet<TaskCluster>();
taskClusterAttempts = new ArrayList<TaskClusterAttempt>();
@@ -48,6 +55,14 @@
return dependencies;
}
+ public Set<PartitionId> getProducedPartitions() {
+ return producedPartitions;
+ }
+
+ public Set<PartitionId> getRequiredPartitions() {
+ return requiredPartitions;
+ }
+
public Set<TaskCluster> getBlockers() {
return blockers;
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
index 1fb4bc3..643cc18 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -18,7 +18,7 @@
import java.util.Map;
import java.util.UUID;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
@@ -48,10 +48,10 @@
JobRun run = ccs.getRunMap().get(jobId);
if (run != null) {
TaskId tid = taId.getTaskId();
- Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
+ Map<ActivityId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
if (ac != null) {
- Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
+ Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
Task[] taskStates = taskStateMap.get(tid.getActivityId());
if (taskStates != null && taskStates.length > tid.getPartition()) {
Task ts = taskStates[tid.getPartition()];
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
index 9e47d8a..603623b 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/JobCreateEvent.java
@@ -60,7 +60,7 @@
PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
@Override
public void visit(IOperatorDescriptor op) {
- op.contributeTaskGraph(builder);
+ op.contributeActivities(builder);
}
});
final JobActivityGraph jag = builder.getActivityGraph();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
index e9861d8..530d00a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionAvailibilityEvent.java
@@ -46,7 +46,7 @@
partitionAvailabilityMap.put(pid, networkAddress);
Map<PartitionId, String> partitionRequestorMap = run.getPartitionRequestorMap();
- String requestor = partitionRequestorMap.get(pid);
+ String requestor = partitionRequestorMap.remove(pid);
if (requestor != null) {
NodeControllerState ncs = ccs.getNodeMap().get(requestor);
if (ncs != null) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
index e7c92e7..0055946 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/RegisterPartitionRequestEvent.java
@@ -46,8 +46,6 @@
if (run == null) {
return;
}
- Map<PartitionId, String> partitionRequestorMap = run.getPartitionRequestorMap();
- partitionRequestorMap.put(pid, nodeId);
Map<PartitionId, NetworkAddress> partitionAvailabilityMap = run.getPartitionAvailabilityMap();
NetworkAddress networkAddress = partitionAvailabilityMap.get(pid);
@@ -58,6 +56,9 @@
} catch (Exception e) {
e.printStackTrace();
}
+ } else {
+ Map<PartitionId, String> partitionRequestorMap = run.getPartitionRequestorMap();
+ partitionRequestorMap.put(pid, nodeId);
}
}
}
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/PortMapMergingAccumulator.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/PortMapMergingAccumulator.java
deleted file mode 100644
index 8cffd08..0000000
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/remote/ops/PortMapMergingAccumulator.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.remote.ops;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.Endpoint;
-import edu.uci.ics.hyracks.api.dataflow.PortInstanceId;
-import edu.uci.ics.hyracks.control.cc.remote.Accumulator;
-
-public class PortMapMergingAccumulator implements
- Accumulator<Map<PortInstanceId, Endpoint>, Map<PortInstanceId, Endpoint>> {
- Map<PortInstanceId, Endpoint> portMap = new HashMap<PortInstanceId, Endpoint>();
-
- @Override
- public void accumulate(Map<PortInstanceId, Endpoint> o) {
- portMap.putAll(o);
- }
-
- @Override
- public Map<PortInstanceId, Endpoint> getResult() {
- return portMap;
- }
-}
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
index c325433..15c2db7 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
@@ -46,7 +47,6 @@
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
private static final Logger LOGGER = Logger.getLogger(DefaultActivityClusterStateMachine.class.getName());
@@ -146,19 +146,25 @@
inProgressTaskClusters.add(tc);
}
+ private TaskClusterAttempt findLastTaskClusterAttempt(TaskCluster tc) {
+ List<TaskClusterAttempt> attempts = tc.getAttempts();
+ if (!attempts.isEmpty()) {
+ return attempts.get(attempts.size() - 1);
+ }
+ return null;
+ }
+
@Override
public void notifyTaskComplete(TaskAttempt ta) throws HyracksException {
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTaskState().getTaskCluster();
- List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
- int lastAttempt = tcAttempts.size() - 1;
- if (taId.getAttempt() == lastAttempt) {
- TaskClusterAttempt tcAttempt = tcAttempts.get(lastAttempt);
+ TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+ if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
TaskAttempt.TaskStatus taStatus = ta.getStatus();
if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
ta.setStatus(TaskAttempt.TaskStatus.COMPLETED, null);
- if (tcAttempt.decrementPendingTasksCounter() == 0) {
- tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
+ if (lastAttempt.decrementPendingTasksCounter() == 0) {
+ lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.COMPLETED);
inProgressTaskClusters.remove(tc);
startRunnableTaskClusters();
}
@@ -171,42 +177,19 @@
}
private void startRunnableTaskClusters() throws HyracksException {
- TaskCluster[] taskClusters = ac.getTaskClusters();
-
+ Set<TaskCluster> runnableTaskClusters = new HashSet<TaskCluster>();
+ findRunnableTaskClusters(runnableTaskClusters);
Map<String, List<TaskAttemptDescriptor>> taskAttemptMap = new HashMap<String, List<TaskAttemptDescriptor>>();
- for (TaskCluster tc : taskClusters) {
- Set<TaskCluster> dependencies = tc.getDependencies();
- List<TaskClusterAttempt> attempts = tc.getAttempts();
- if (!attempts.isEmpty()) {
- TaskClusterAttempt lastAttempt = attempts.get(attempts.size() - 1);
- if (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED
- || lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
- continue;
+ for (TaskCluster tc : runnableTaskClusters) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Found runnable TC: " + Arrays.toString(tc.getTasks()));
+ List<TaskClusterAttempt> attempts = tc.getAttempts();
+ LOGGER.info("Attempts so far:" + attempts.size());
+ for (TaskClusterAttempt tcAttempt : attempts) {
+ LOGGER.info("Status: " + tcAttempt.getStatus());
}
}
- boolean runnable = true;
- for (TaskCluster depTC : dependencies) {
- List<TaskClusterAttempt> tcAttempts = depTC.getAttempts();
- if (tcAttempts.isEmpty()) {
- runnable = false;
- break;
- }
- TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
- if (tcAttempt.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
- runnable = false;
- break;
- }
- }
- if (runnable) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("Found runnable TC: " + Arrays.toString(tc.getTasks()));
- LOGGER.info("Attempts so far:" + attempts.size());
- for (TaskClusterAttempt tcAttempt : attempts) {
- LOGGER.info("Status: " + tcAttempt.getStatus());
- }
- }
- assignTaskLocations(tc, taskAttemptMap);
- }
+ assignTaskLocations(tc, taskAttemptMap);
}
if (taskAttemptMap.isEmpty()) {
@@ -219,7 +202,63 @@
startTasks(taskAttemptMap);
}
- private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) {
+ private void findRunnableTaskClusters(Set<TaskCluster> runnableTaskClusters) {
+ TaskCluster[] taskClusters = ac.getTaskClusters();
+
+ for (TaskCluster tc : taskClusters) {
+ Set<TaskCluster> blockers = tc.getBlockers();
+ TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+ if (lastAttempt != null
+ && (lastAttempt.getStatus() == TaskClusterAttempt.TaskClusterStatus.COMPLETED || lastAttempt
+ .getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING)) {
+ continue;
+ }
+ boolean runnable = true;
+ for (TaskCluster blockerTC : blockers) {
+ List<TaskClusterAttempt> tcAttempts = blockerTC.getAttempts();
+ if (tcAttempts.isEmpty()) {
+ runnable = false;
+ break;
+ }
+ TaskClusterAttempt tcAttempt = tcAttempts.get(tcAttempts.size() - 1);
+ if (tcAttempt.getStatus() != TaskClusterAttempt.TaskClusterStatus.COMPLETED) {
+ runnable = false;
+ break;
+ }
+ }
+ if (runnable) {
+ runnableTaskClusters.add(tc);
+ }
+ }
+ }
+
+ private void findCascadingAbortTaskClusterAttempts(TaskClusterAttempt abortedTCAttempt,
+ Set<TaskClusterAttempt> cascadingAbortTaskClusterAttempts) {
+ boolean changed = true;
+ cascadingAbortTaskClusterAttempts.add(abortedTCAttempt);
+ while (changed) {
+ changed = false;
+ for (TaskCluster tc : ac.getTaskClusters()) {
+ TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
+ if (tca != null && tca.getStatus() == TaskClusterAttempt.TaskClusterStatus.RUNNING) {
+ boolean abort = false;
+ for (TaskClusterAttempt catca : cascadingAbortTaskClusterAttempts) {
+ TaskCluster catc = catca.getTaskCluster();
+ if (tc.getDependencies().contains(catc)) {
+ abort = true;
+ break;
+ }
+ }
+ if (abort) {
+ changed = cascadingAbortTaskClusterAttempts.add(tca) || changed;
+ }
+ }
+ }
+ }
+ cascadingAbortTaskClusterAttempts.remove(abortedTCAttempt);
+ }
+
+ private void startTasks(Map<String, List<TaskAttemptDescriptor>> taskAttemptMap) throws HyracksException {
Executor executor = ccs.getExecutor();
JobRun jobRun = ac.getJobRun();
final UUID jobId = jobRun.getJobId();
@@ -249,7 +288,7 @@
}
}
- private void abortTaskCluster(TaskClusterAttempt tcAttempt) {
+ private void abortTaskCluster(TaskClusterAttempt tcAttempt) throws HyracksException {
Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<String, List<TaskAttemptId>>();
for (TaskAttempt ta2 : tcAttempt.getTaskAttempts()) {
if (ta2.getStatus() == TaskAttempt.TaskStatus.RUNNING) {
@@ -281,23 +320,26 @@
});
}
}
-
}
@Override
public void notifyTaskFailure(TaskAttempt ta, Exception exception) throws HyracksException {
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTaskState().getTaskCluster();
- List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
- int lastAttempt = tcAttempts.size() - 1;
- if (taId.getAttempt() == lastAttempt) {
- TaskClusterAttempt tcAttempt = tcAttempts.get(lastAttempt);
+ TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
+ if (lastAttempt != null && taId.getAttempt() == lastAttempt.getAttempt()) {
TaskAttempt.TaskStatus taStatus = ta.getStatus();
if (taStatus == TaskAttempt.TaskStatus.RUNNING) {
ta.setStatus(TaskAttempt.TaskStatus.FAILED, exception);
- abortTaskCluster(tcAttempt);
- tcAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
- ac.notifyTaskClusterFailure(tcAttempt, exception);
+ abortTaskCluster(lastAttempt);
+ lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
+ Set<TaskClusterAttempt> cascadingAbortTaskClusterAttempts = new HashSet<TaskClusterAttempt>();
+ findCascadingAbortTaskClusterAttempts(lastAttempt, cascadingAbortTaskClusterAttempts);
+ for (TaskClusterAttempt tca : cascadingAbortTaskClusterAttempts) {
+ abortTaskCluster(tca);
+ tca.setStatus(TaskClusterAttempt.TaskClusterStatus.ABORTED);
+ }
+ ac.notifyTaskClusterFailure(lastAttempt, exception);
} else {
LOGGER.warning("Spurious task complete notification: " + taId + " Current state = " + taStatus);
}
@@ -307,7 +349,7 @@
}
@Override
- public void abort() {
+ public void abort() throws HyracksException {
TaskCluster[] taskClusters = ac.getTaskClusters();
for (TaskCluster tc : taskClusters) {
List<TaskClusterAttempt> tcAttempts = tc.getAttempts();
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index 5737743..a15469a 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -29,17 +29,21 @@
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipelinedConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.util.Pair;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
@@ -50,9 +54,6 @@
import edu.uci.ics.hyracks.control.cc.job.Task;
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
-import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
-import edu.uci.ics.hyracks.control.common.job.dataflow.SendSideMaterializedConnectorPolicy;
public class DefaultJobRunStateMachine implements IJobRunStateMachine {
private static final Logger LOGGER = Logger.getLogger(DefaultJobRunStateMachine.class.getName());
@@ -87,33 +88,35 @@
return solver;
}
- private static Pair<ActivityNodeId, ActivityNodeId> findMergePair(JobActivityGraph jag, JobSpecification spec,
+ private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, JobSpecification spec,
Set<ActivityCluster> eqSets) {
- Map<ActivityNodeId, IActivityNode> activityNodeMap = jag.getActivityNodeMap();
+ Map<ActivityId, IActivity> activityNodeMap = jag.getActivityNodeMap();
for (ActivityCluster eqSet : eqSets) {
- for (ActivityNodeId t : eqSet.getActivities()) {
- IOperatorDescriptor owner = activityNodeMap.get(t).getOwner();
+ for (ActivityId t : eqSet.getActivities()) {
+ IActivity activity = activityNodeMap.get(t);
List<Integer> inputList = jag.getActivityInputMap().get(t);
if (inputList != null) {
for (Integer idx : inputList) {
- IConnectorDescriptor conn = spec.getInputConnectorDescriptor(owner, idx);
+ IConnectorDescriptor conn = spec.getInputConnectorDescriptor(activity.getActivityId()
+ .getOperatorDescriptorId(), idx);
OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
int producerOutputIndex = spec.getProducerOutputIndex(conn);
- ActivityNodeId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+ ActivityId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
if (!eqSet.getActivities().contains(inTask)) {
- return new Pair<ActivityNodeId, ActivityNodeId>(t, inTask);
+ return new Pair<ActivityId, ActivityId>(t, inTask);
}
}
}
List<Integer> outputList = jag.getActivityOutputMap().get(t);
if (outputList != null) {
for (Integer idx : outputList) {
- IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(owner, idx);
+ IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(activity.getActivityId()
+ .getOperatorDescriptorId(), idx);
OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
int consumerInputIndex = spec.getConsumerInputIndex(conn);
- ActivityNodeId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+ ActivityId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
if (!eqSet.getActivities().contains(outTask)) {
- return new Pair<ActivityNodeId, ActivityNodeId>(t, outTask);
+ return new Pair<ActivityId, ActivityId>(t, outTask);
}
}
}
@@ -128,11 +131,11 @@
/*
* Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
*/
- Map<ActivityNodeId, ActivityCluster> stageMap = new HashMap<ActivityNodeId, ActivityCluster>();
+ Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
- for (Set<ActivityNodeId> taskIds : jag.getOperatorActivityMap().values()) {
- for (ActivityNodeId taskId : taskIds) {
- Set<ActivityNodeId> eqSet = new HashSet<ActivityNodeId>();
+ for (Set<ActivityId> taskIds : jag.getOperatorActivityMap().values()) {
+ for (ActivityId taskId : taskIds) {
+ Set<ActivityId> eqSet = new HashSet<ActivityId>();
eqSet.add(taskId);
ActivityCluster stage = new ActivityCluster(jobRun, eqSet);
stageMap.put(taskId, stage);
@@ -143,23 +146,23 @@
boolean changed = true;
while (changed) {
changed = false;
- Pair<ActivityNodeId, ActivityNodeId> pair = findMergePair(jag, spec, stages);
+ Pair<ActivityId, ActivityId> pair = findMergePair(jag, spec, stages);
if (pair != null) {
merge(stageMap, stages, pair.first, pair.second);
changed = true;
}
}
- ActivityCluster endStage = new ActivityCluster(jobRun, new HashSet<ActivityNodeId>());
- Map<ActivityNodeId, Set<ActivityNodeId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
+ ActivityCluster endStage = new ActivityCluster(jobRun, new HashSet<ActivityId>());
+ Map<ActivityId, Set<ActivityId>> blocker2BlockedMap = jag.getBlocker2BlockedMap();
for (ActivityCluster s : stages) {
endStage.addDependency(s);
s.addDependent(endStage);
Set<ActivityCluster> blockedStages = new HashSet<ActivityCluster>();
- for (ActivityNodeId t : s.getActivities()) {
- Set<ActivityNodeId> blockedTasks = blocker2BlockedMap.get(t);
+ for (ActivityId t : s.getActivities()) {
+ Set<ActivityId> blockedTasks = blocker2BlockedMap.get(t);
if (blockedTasks != null) {
- for (ActivityNodeId bt : blockedTasks) {
+ for (ActivityId bt : blockedTasks) {
blockedStages.add(stageMap.get(bt));
}
}
@@ -180,14 +183,14 @@
return endStage;
}
- private void merge(Map<ActivityNodeId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityNodeId t1,
- ActivityNodeId t2) {
+ private void merge(Map<ActivityId, ActivityCluster> eqSetMap, Set<ActivityCluster> eqSets, ActivityId t1,
+ ActivityId t2) {
ActivityCluster stage1 = eqSetMap.get(t1);
- Set<ActivityNodeId> s1 = stage1.getActivities();
+ Set<ActivityId> s1 = stage1.getActivities();
ActivityCluster stage2 = eqSetMap.get(t2);
- Set<ActivityNodeId> s2 = stage2.getActivities();
+ Set<ActivityId> s2 = stage2.getActivities();
- Set<ActivityNodeId> mergedSet = new HashSet<ActivityNodeId>();
+ Set<ActivityId> mergedSet = new HashSet<ActivityId>();
mergedSet.addAll(s1);
mergedSet.addAll(s2);
@@ -196,7 +199,7 @@
ActivityCluster mergedStage = new ActivityCluster(jobRun, mergedSet);
eqSets.add(mergedStage);
- for (ActivityNodeId t : mergedSet) {
+ for (ActivityId t : mergedSet) {
eqSetMap.put(t, mergedStage);
}
}
@@ -275,10 +278,10 @@
}
}
- private Map<ActivityNodeId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
+ private Map<ActivityId, ActivityPartitionDetails> computePartitionCounts(ActivityCluster ac)
throws HyracksException {
Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
- for (ActivityNodeId anId : ac.getActivities()) {
+ for (ActivityId anId : ac.getActivities()) {
lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
}
solver.solve(lValues);
@@ -298,8 +301,8 @@
}
nPartMap.put(((PartitionCountExpression) lv).getOperatorDescriptorId(), Integer.valueOf(nParts));
}
- Map<ActivityNodeId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityNodeId, ActivityPartitionDetails>();
- for (ActivityNodeId anId : ac.getActivities()) {
+ Map<ActivityId, ActivityPartitionDetails> activityPartsMap = new HashMap<ActivityId, ActivityPartitionDetails>();
+ for (ActivityId anId : ac.getActivities()) {
int nParts = nPartMap.get(anId.getOperatorDescriptorId());
int[] nInputPartitions = null;
List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
@@ -327,61 +330,54 @@
}
private void buildTaskClusters(ActivityCluster ac) throws HyracksException {
- Map<ActivityNodeId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
+ Map<ActivityId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
- Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
+ Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
+ Set<ActivityId> activities = ac.getActivities();
- for (ActivityNodeId anId : ac.getActivities()) {
+ Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
+
+ for (ActivityId anId : activities) {
ActivityPartitionDetails apd = pcMap.get(anId);
Task[] taskStates = new Task[apd.getPartitionCount()];
for (int i = 0; i < taskStates.length; ++i) {
- taskStates[i] = new Task(new TaskId(anId, i), apd);
+ TaskId tid = new TaskId(anId, i);
+ taskStates[i] = new Task(tid, apd);
+ Set<TaskId> cluster = new HashSet<TaskId>();
+ cluster.add(tid);
+ taskClusterMap.put(tid, cluster);
}
taskStateMap.put(anId, taskStates);
}
+
Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
ac.setConnectorPolicyMap(connectorPolicies);
-
- Set<ActivityNodeId> activities = ac.getActivities();
- Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
- for (ActivityNodeId anId : activities) {
- Task[] taskStates = taskStateMap.get(anId);
- for (Task ts : taskStates) {
- Set<TaskId> cluster = new HashSet<TaskId>();
- cluster.add(ts.getTaskId());
- taskClusterMap.put(ts.getTaskId(), cluster);
- }
- }
-
- Map<TaskId, List<Pair<TaskId, IConnectorPolicy>>> connectionInfo = new HashMap<TaskId, List<Pair<TaskId, IConnectorPolicy>>>();
+ Map<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>> taskConnectivity = new HashMap<TaskId, List<Pair<TaskId, ConnectorDescriptorId>>>();
JobActivityGraph jag = jobRun.getJobActivityGraph();
BitSet targetBitmap = new BitSet();
- for (ActivityNodeId ac1 : activities) {
+ for (ActivityId ac1 : activities) {
Task[] ac1TaskStates = taskStateMap.get(ac1);
int nProducers = ac1TaskStates.length;
List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
- IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
- if (cPolicy == null) {
- cPolicy = new SendSideMaterializedConnectorPolicy();
- }
- ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
+ ActivityId ac2 = jag.getConsumerActivity(cdId);
Task[] ac2TaskStates = taskStateMap.get(ac2);
int nConsumers = ac2TaskStates.length;
for (int i = 0; i < nProducers; ++i) {
c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
- List<Pair<TaskId, IConnectorPolicy>> cInfoList = connectionInfo.get(ac1TaskStates[i]
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(ac1TaskStates[i]
.getTaskId());
if (cInfoList == null) {
- cInfoList = new ArrayList<Pair<TaskId, IConnectorPolicy>>();
- connectionInfo.put(ac1TaskStates[i].getTaskId(), cInfoList);
+ cInfoList = new ArrayList<Pair<TaskId, ConnectorDescriptorId>>();
+ taskConnectivity.put(ac1TaskStates[i].getTaskId(), cInfoList);
}
Set<TaskId> cluster = taskClusterMap.get(ac1TaskStates[i].getTaskId());
for (int j = targetBitmap.nextSetBit(0); j >= 0; j = targetBitmap.nextSetBit(j + 1)) {
- cInfoList.add(new Pair<TaskId, IConnectorPolicy>(ac2TaskStates[j].getTaskId(), cPolicy));
+ cInfoList.add(new Pair<TaskId, ConnectorDescriptorId>(ac2TaskStates[j].getTaskId(), cdId));
+ IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
if (cPolicy.requiresProducerConsumerCoscheduling()) {
cluster.add(ac2TaskStates[j].getTaskId());
}
@@ -434,17 +430,25 @@
}
ac.setTaskClusters(tcSet.toArray(new TaskCluster[tcSet.size()]));
- for (TaskCluster tc : tcSet) {
+ Map<PartitionId, TaskCluster> partitionProducingTaskClusterMap = ac.getPartitionProducingTaskClusterMap();
+ for (TaskCluster tc : ac.getTaskClusters()) {
for (Task ts : tc.getTasks()) {
TaskId tid = ts.getTaskId();
- List<Pair<TaskId, IConnectorPolicy>> cInfoList = connectionInfo.get(tid);
+ List<Pair<TaskId, ConnectorDescriptorId>> cInfoList = taskConnectivity.get(tid);
if (cInfoList != null) {
- for (Pair<TaskId, IConnectorPolicy> p : cInfoList) {
+ for (Pair<TaskId, ConnectorDescriptorId> p : cInfoList) {
Task targetTS = taskStateMap.get(p.first.getActivityId())[p.first.getPartition()];
TaskCluster targetTC = targetTS.getTaskCluster();
if (targetTC != tc) {
+ ConnectorDescriptorId cdId = p.second;
+ PartitionId pid = new PartitionId(jobRun.getJobId(), cdId, tid.getPartition(),
+ p.first.getPartition());
+ tc.getProducedPartitions().add(pid);
+ targetTC.getRequiredPartitions().add(pid);
+ partitionProducingTaskClusterMap.put(pid, tc);
+ IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
targetTC.getDependencies().add(tc);
- if (p.second.consumerWaitsForProducerToFinish()) {
+ if (cPolicy.consumerWaitsForProducerToFinish()) {
targetTC.getBlockers().add(tc);
}
}
@@ -466,20 +470,20 @@
}
private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
- Map<ActivityNodeId, ActivityPartitionDetails> pcMap) {
+ Map<ActivityId, ActivityPartitionDetails> pcMap) {
JobActivityGraph jag = jobRun.getJobActivityGraph();
Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
- Set<ActivityNodeId> activities = ac.getActivities();
- Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
+ Set<ActivityId> activities = ac.getActivities();
+ Map<ActivityId, Task[]> taskStateMap = ac.getTaskMap();
BitSet targetBitmap = new BitSet();
- for (ActivityNodeId ac1 : activities) {
+ for (ActivityId ac1 : activities) {
Task[] ac1TaskStates = taskStateMap.get(ac1);
int nProducers = ac1TaskStates.length;
List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
- ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
+ ActivityId ac2 = jag.getConsumerActivity(cdId);
Task[] ac2TaskStates = taskStateMap.get(ac2);
int nConsumers = ac2TaskStates.length;
@@ -497,6 +501,11 @@
}
private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
+ IConnectorPolicyAssignmentPolicy cpap = jobRun.getJobActivityGraph().getJobSpecification()
+ .getConnectorPolicyAssignmentPolicy();
+ if (cpap != null) {
+ return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
+ }
return new PipelinedConnectorPolicy();
}
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 54b31f8..eade061 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -22,9 +22,9 @@
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
public interface INodeController extends Remote {
public String getId() throws Exception;
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 072c2ec..30434b8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -26,7 +26,6 @@
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -84,11 +83,11 @@
return jobId;
}
- public IOperatorEnvironment getEnvironment(IOperatorDescriptor hod, int partition) {
- if (!envMap.containsKey(hod.getOperatorId())) {
- envMap.put(hod.getOperatorId(), new HashMap<Integer, IOperatorEnvironment>());
+ public IOperatorEnvironment getEnvironment(OperatorDescriptorId opId, int partition) {
+ if (!envMap.containsKey(opId)) {
+ envMap.put(opId, new HashMap<Integer, IOperatorEnvironment>());
}
- Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(hod.getOperatorId());
+ Map<Integer, IOperatorEnvironment> opEnvMap = envMap.get(opId);
if (!opEnvMap.containsKey(partition)) {
opEnvMap.put(partition, new OperatorEnvironmentImpl(nodeController.getId()));
}
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index c7636f8..c4c396f 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -53,13 +53,13 @@
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -76,9 +76,6 @@
import edu.uci.ics.hyracks.control.common.base.NodeParameters;
import edu.uci.ics.hyracks.control.common.context.ServerContext;
import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
-import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
-import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
-import edu.uci.ics.hyracks.control.common.job.dataflow.SendSideMaterializedConnectorPolicy;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -89,6 +86,7 @@
import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
+import edu.uci.ics.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
public class NodeControllerService extends AbstractRemoteService implements INodeController {
@@ -253,14 +251,14 @@
for (TaskAttemptDescriptor td : taskDescriptors) {
TaskAttemptId taId = td.getTaskAttemptId();
TaskId tid = taId.getTaskId();
- IActivityNode han = plan.getActivityNodeMap().get(tid.getActivityId());
+ IActivity han = plan.getActivityNodeMap().get(tid.getActivityId());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Initializing " + taId + " -> " + han);
}
- IOperatorDescriptor op = han.getOwner();
final int partition = tid.getPartition();
Task task = new Task(joblet, taId, han.getClass().getName());
- IOperatorNodePushable operator = han.createPushRuntime(task, joblet.getEnvironment(op, partition), rdp,
+ IOperatorNodePushable operator = han.createPushRuntime(task,
+ joblet.getEnvironment(han.getActivityId().getOperatorDescriptorId(), partition), rdp,
partition, td.getPartitionCount());
IPartitionCollector collector = null;
@@ -272,12 +270,12 @@
throw new HyracksException("Multiple inputs to an activity not currently supported");
}
IConnectorDescriptor conn = inputs.get(i);
+ IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("input: " + i + ": " + conn.getConnectorId());
}
RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
- collector = conn.createPartitionCollector(task, recordDesc, partition,
- td.getInputPartitionCounts()[i], td.getPartitionCount());
+ collector = createPartitionCollector(td, partition, task, i, conn, recordDesc, cPolicy);
}
}
List<IConnectorDescriptor> outputs = plan.getActivityOutputConnectorDescriptors(tid.getActivityId());
@@ -310,17 +308,21 @@
}
}
+ private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor td, final int partition, Task task,
+ int i, IConnectorDescriptor conn, RecordDescriptor recordDesc, IConnectorPolicy cPolicy)
+ throws HyracksDataException {
+ IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
+ td.getInputPartitionCounts()[i], td.getPartitionCount());
+ if (cPolicy.materializeOnReceiveSide()) {
+ return new ReceiveSideMaterializingCollector(ctx, partitionManager, collector, executor);
+ } else {
+ return collector;
+ }
+ }
+
private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final UUID jobId,
final IConnectorDescriptor conn, final int senderIndex) {
- if (cPolicy instanceof PipelinedConnectorPolicy) {
- return new IPartitionWriterFactory() {
- @Override
- public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
- senderIndex, receiverIndex));
- }
- };
- } else if (cPolicy instanceof SendSideMaterializedConnectorPolicy) {
+ if (cPolicy.materializeOnSendSide()) {
return new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
@@ -328,8 +330,15 @@
conn.getConnectorId(), senderIndex, receiverIndex), executor);
}
};
+ } else {
+ return new IPartitionWriterFactory() {
+ @Override
+ public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+ return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
+ senderIndex, receiverIndex));
+ }
+ };
}
- throw new IllegalArgumentException("Unknown connector policy: " + cPolicy);
}
private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, INCApplicationContext appCtx) throws Exception {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
new file mode 100644
index 0000000..971f2b6
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class MaterializedPartitionInputChannel implements IInputChannel {
+ private final Queue<ByteBuffer> emptyQueue;
+
+ private final Queue<ByteBuffer> fullQueue;
+
+ private final PartitionId pid;
+
+ private final PartitionManager manager;
+
+ private final FrameWriter writer;
+
+ private IInputChannelMonitor monitor;
+
+ private Object attachment;
+
+ public MaterializedPartitionInputChannel(IHyracksRootContext ctx, int nBuffers, PartitionId pid,
+ PartitionManager manager) {
+ this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ for (int i = 0; i < nBuffers; ++i) {
+ emptyQueue.add(ctx.allocateFrame());
+ }
+ fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ this.pid = pid;
+ this.manager = manager;
+ writer = new FrameWriter();
+ }
+
+ @Override
+ public void registerMonitor(IInputChannelMonitor monitor) throws HyracksException {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public ByteBuffer getNextBuffer() {
+ return fullQueue.poll();
+ }
+
+ @Override
+ public void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ synchronized (this) {
+ emptyQueue.add(buffer);
+ notifyAll();
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ MaterializedPartition partition = (MaterializedPartition) manager.getPartition(pid);
+ partition.writeTo(writer);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ private class FrameWriter implements IFrameWriter {
+ @Override
+ public void open() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ synchronized (MaterializedPartitionInputChannel.this) {
+ while (emptyQueue.isEmpty()) {
+ try {
+ MaterializedPartitionInputChannel.this.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ ByteBuffer destFrame = emptyQueue.poll();
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ destFrame.clear();
+ destFrame.put(buffer);
+ fullQueue.add(destFrame);
+ monitor.notifyDataAvailability(MaterializedPartitionInputChannel.this, 1);
+ }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ monitor.notifyEndOfStream(MaterializedPartitionInputChannel.this);
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
new file mode 100644
index 0000000..5f3a3f4
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.comm.IFrameReader;
+import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
+import edu.uci.ics.hyracks.api.comm.PartitionChannel;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+
+public class ReceiveSideMaterializingCollector implements IPartitionCollector {
+ private final IHyracksRootContext ctx;
+
+ private PartitionManager manager;
+
+ private final IPartitionCollector delegate;
+
+ private final Executor executor;
+
+ public ReceiveSideMaterializingCollector(IHyracksRootContext ctx, PartitionManager manager,
+ IPartitionCollector collector, Executor executor) {
+ this.ctx = ctx;
+ this.manager = manager;
+ this.delegate = collector;
+ this.executor = executor;
+ }
+
+ @Override
+ public UUID getJobId() {
+ return delegate.getJobId();
+ }
+
+ @Override
+ public ConnectorDescriptorId getConnectorId() {
+ return delegate.getConnectorId();
+ }
+
+ @Override
+ public int getReceiverIndex() {
+ return delegate.getReceiverIndex();
+ }
+
+ @Override
+ public void open() throws HyracksException {
+ delegate.open();
+ }
+
+ @Override
+ public void addPartitions(Collection<PartitionChannel> partitions) throws HyracksException {
+ for (final PartitionChannel pc : partitions) {
+ PartitionWriter writer = new PartitionWriter(pc);
+ executor.execute(writer);
+ }
+ }
+
+ private class PartitionWriter implements Runnable, IInputChannelMonitor {
+ private PartitionChannel pc;
+
+ private int nAvailableFrames;
+
+ private boolean eos;
+
+ public PartitionWriter(PartitionChannel pc) {
+ this.pc = pc;
+ nAvailableFrames = 0;
+ eos = false;
+ }
+
+ @Override
+ public synchronized void notifyDataAvailability(IInputChannel channel, int nFrames) {
+ nAvailableFrames += nFrames;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void notifyEndOfStream(IInputChannel channel) {
+ eos = true;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void run() {
+ PartitionId pid = pc.getPartitionId();
+ MaterializedPartitionWriter mpw = new MaterializedPartitionWriter(ctx, manager, pid, executor);
+ IInputChannel channel = pc.getInputChannel();
+ try {
+ channel.registerMonitor(this);
+ channel.open();
+ mpw.open();
+ while (true) {
+ if (nAvailableFrames > 0) {
+ ByteBuffer buffer = channel.getNextBuffer();
+ --nAvailableFrames;
+ mpw.nextFrame(buffer);
+ channel.recycleBuffer(buffer);
+ } else if (eos) {
+ break;
+ } else {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ mpw.close();
+ channel.close();
+ delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
+ new MaterializedPartitionInputChannel(ctx, 5, pid, manager))));
+ } catch (HyracksException e) {
+ }
+ }
+ }
+
+ @Override
+ public IFrameReader getReader() throws HyracksException {
+ return delegate.getReader();
+ }
+
+ @Override
+ public void close() throws HyracksException {
+ delegate.close();
+ }
+
+ @Override
+ public Collection<PartitionId> getRequiredPartitionIds() throws HyracksException {
+ return delegate.getRequiredPartitionIds();
+ }
+
+ @Override
+ public void abort() {
+ delegate.abort();
+ }
+}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractActivityNode.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractActivityNode.java
index 4976230..11a02b3 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractActivityNode.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractActivityNode.java
@@ -14,22 +14,20 @@
*/
package edu.uci.ics.hyracks.dataflow.std.base;
-import java.util.UUID;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
-
-public abstract class AbstractActivityNode implements IActivityNode {
+public abstract class AbstractActivityNode implements IActivity {
private static final long serialVersionUID = 1L;
- protected final ActivityNodeId id;
+ protected final ActivityId id;
- public AbstractActivityNode() {
- this.id = new ActivityNodeId(getOwner().getOperatorId(), UUID.randomUUID());
+ public AbstractActivityNode(ActivityId id) {
+ this.id = id;
}
@Override
- public ActivityNodeId getActivityNodeId() {
+ public ActivityId getActivityId() {
return id;
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
index 018b7b7..c4c3326 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
@@ -14,38 +14,29 @@
*/
package edu.uci.ics.hyracks.dataflow.std.base;
-import java.util.UUID;
-
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
-public abstract class AbstractSingleActivityOperatorDescriptor extends AbstractOperatorDescriptor implements
- IActivityNode {
+public abstract class AbstractSingleActivityOperatorDescriptor extends AbstractOperatorDescriptor implements IActivity {
private static final long serialVersionUID = 1L;
- protected final ActivityNodeId activityNodeId;
+ protected final ActivityId activityNodeId;
public AbstractSingleActivityOperatorDescriptor(JobSpecification spec, int inputArity, int outputArity) {
super(spec, inputArity, outputArity);
- activityNodeId = new ActivityNodeId(odId, UUID.randomUUID());
+ activityNodeId = new ActivityId(odId, 0);
}
@Override
- public ActivityNodeId getActivityNodeId() {
+ public ActivityId getActivityId() {
return activityNodeId;
}
@Override
- public final IOperatorDescriptor getOwner() {
- return this;
- }
-
- @Override
- public final void contributeTaskGraph(IActivityGraphBuilder builder) {
- builder.addTask(this);
+ public final void contributeActivities(IActivityGraphBuilder builder) {
+ builder.addActivity(this);
for (int i = 0; i < getInputArity(); ++i) {
builder.addSourceEdge(i, this, i);
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
index 4257083..6d3d891 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/ExternalHashGroupOperatorDescriptor.java
@@ -25,8 +25,8 @@
import edu.uci.ics.hyracks.api.comm.IFrameTupleAccessor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -175,14 +175,14 @@
* (edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder)
*/
@Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- PartialAggregateActivity partialAggAct = new PartialAggregateActivity();
- MergeActivity mergeAct = new MergeActivity();
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ PartialAggregateActivity partialAggAct = new PartialAggregateActivity(new ActivityId(odId, 0));
+ MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, 1));
- builder.addTask(partialAggAct);
+ builder.addActivity(partialAggAct);
builder.addSourceEdge(0, partialAggAct, 0);
- builder.addTask(mergeAct);
+ builder.addActivity(mergeAct);
builder.addTargetEdge(0, mergeAct, 0);
// FIXME Block or not?
@@ -191,12 +191,12 @@
}
private class PartialAggregateActivity extends AbstractActivityNode {
-
- /**
- *
- */
private static final long serialVersionUID = 1L;
+ public PartialAggregateActivity(ActivityId id) {
+ super(id);
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -302,21 +302,15 @@
return op;
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return ExternalHashGroupOperatorDescriptor.this;
- }
-
}
private class MergeActivity extends AbstractActivityNode {
-
- /**
- *
- */
private static final long serialVersionUID = 1L;
+ public MergeActivity(ActivityId id) {
+ super(id);
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -654,11 +648,6 @@
return op;
}
- @Override
- public IOperatorDescriptor getOwner() {
- return ExternalHashGroupOperatorDescriptor.this;
- }
-
private Comparator<ReferenceEntry> createEntryComparator(final IBinaryComparator[] comparators) {
return new Comparator<ReferenceEntry>() {
public int compare(ReferenceEntry tp1, ReferenceEntry tp2) {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index 8d92c54..cf954f6 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -17,8 +17,8 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -57,12 +57,12 @@
}
@Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- HashBuildActivity ha = new HashBuildActivity();
- builder.addTask(ha);
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, 0));
+ builder.addActivity(ha);
- OutputActivity oa = new OutputActivity();
- builder.addTask(oa);
+ OutputActivity oa = new OutputActivity(new ActivityId(odId, 1));
+ builder.addActivity(oa);
builder.addSourceEdge(0, ha, 0);
builder.addTargetEdge(0, oa, 0);
@@ -72,6 +72,10 @@
private class HashBuildActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
+ public HashBuildActivity(ActivityId id) {
+ super(id);
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -106,16 +110,15 @@
}
};
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return HashGroupOperatorDescriptor.this;
- }
}
private class OutputActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
+ public OutputActivity(ActivityId id) {
+ super(id);
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -130,10 +133,5 @@
}
};
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return HashGroupOperatorDescriptor.this;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 1e77547..27ae2db 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -17,8 +17,8 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -73,18 +73,20 @@
}
@Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- HashPartitionActivityNode rpart = new HashPartitionActivityNode(SMALLRELATION, keys0, 0);
- HashPartitionActivityNode spart = new HashPartitionActivityNode(LARGERELATION, keys1, 1);
- JoinActivityNode join = new JoinActivityNode();
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, 0), SMALLRELATION, keys0,
+ 0);
+ HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, 1), LARGERELATION, keys1,
+ 1);
+ JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, 2));
- builder.addTask(rpart);
+ builder.addActivity(rpart);
builder.addSourceEdge(0, rpart, 0);
- builder.addTask(spart);
+ builder.addActivity(spart);
builder.addSourceEdge(1, spart, 0);
- builder.addTask(join);
+ builder.addActivity(join);
builder.addBlockingEdge(rpart, spart);
builder.addBlockingEdge(spart, join);
@@ -101,7 +103,8 @@
private int operatorInputIndex;
private int keys[];
- public HashPartitionActivityNode(String partitionsKey, int keys[], int operatorInputIndex) {
+ public HashPartitionActivityNode(ActivityId id, String partitionsKey, int keys[], int operatorInputIndex) {
+ super(id);
this.partitionsKey = partitionsKey;
this.keys = keys;
this.operatorInputIndex = operatorInputIndex;
@@ -196,16 +199,15 @@
};
return op;
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return GraceHashJoinOperatorDescriptor.this;
- }
}
private class JoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
+ public JoinActivityNode(ActivityId id) {
+ super(id);
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
final IRecordDescriptorProvider recordDescProvider, int partition, final int nPartitions) {
@@ -280,10 +282,5 @@
};
return op;
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return GraceHashJoinOperatorDescriptor.this;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index faa7afb..df7440b 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -17,8 +17,8 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -92,14 +92,14 @@
}
@Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(SMALLRELATION);
- PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(LARGERELATION);
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId, 0), SMALLRELATION);
+ PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId, 1), LARGERELATION);
- builder.addTask(phase1);
+ builder.addActivity(phase1);
builder.addSourceEdge(0, phase1, 0);
- builder.addTask(phase2);
+ builder.addActivity(phase2);
builder.addSourceEdge(1, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
@@ -111,10 +111,9 @@
private static final long serialVersionUID = 1L;
private String relationName;
- public BuildAndPartitionActivityNode(String relationName) {
- super();
+ public BuildAndPartitionActivityNode(ActivityId id, String relationName) {
+ super(id);
this.relationName = relationName;
-
}
@Override
@@ -288,19 +287,14 @@
};
return op;
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return HybridHashJoinOperatorDescriptor.this;
- }
}
private class PartitionAndJoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
private String largeRelation;
- public PartitionAndJoinActivityNode(String relationName) {
- super();
+ public PartitionAndJoinActivityNode(ActivityId id, String relationName) {
+ super(id);
this.largeRelation = relationName;
}
@@ -492,10 +486,5 @@
};
return op;
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return HybridHashJoinOperatorDescriptor.this;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 6f46932..b4797e8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -17,8 +17,8 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -61,14 +61,14 @@
}
@Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- HashBuildActivityNode hba = new HashBuildActivityNode();
- HashProbeActivityNode hpa = new HashProbeActivityNode();
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ HashBuildActivityNode hba = new HashBuildActivityNode(new ActivityId(odId, 0));
+ HashProbeActivityNode hpa = new HashProbeActivityNode(new ActivityId(odId, 1));
- builder.addTask(hba);
+ builder.addActivity(hba);
builder.addSourceEdge(0, hba, 0);
- builder.addTask(hpa);
+ builder.addActivity(hpa);
builder.addSourceEdge(1, hpa, 0);
builder.addTargetEdge(0, hpa, 0);
@@ -78,6 +78,10 @@
private class HashBuildActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
+ public HashBuildActivityNode(ActivityId id) {
+ super(id);
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -119,16 +123,15 @@
};
return op;
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return InMemoryHashJoinOperatorDescriptor.this;
- }
}
private class HashProbeActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
+ public HashProbeActivityNode(ActivityId id) {
+ super(id);
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -160,10 +163,5 @@
};
return op;
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return InMemoryHashJoinOperatorDescriptor.this;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index f4fb240..e8a6eae 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -17,8 +17,8 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -43,14 +43,14 @@
}
@Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- MaterializerActivityNode ma = new MaterializerActivityNode();
- ReaderActivityNode ra = new ReaderActivityNode();
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, 0));
+ ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, 1));
- builder.addTask(ma);
+ builder.addActivity(ma);
builder.addSourceEdge(0, ma, 0);
- builder.addTask(ra);
+ builder.addActivity(ra);
builder.addTargetEdge(0, ra, 0);
builder.addBlockingEdge(ma, ra);
@@ -59,6 +59,10 @@
private final class MaterializerActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
+ public MaterializerActivityNode(ActivityId id) {
+ super(id);
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -89,16 +93,15 @@
}
};
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return MaterializingOperatorDescriptor.this;
- }
}
private final class ReaderActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
+ public ReaderActivityNode(ActivityId id) {
+ super(id);
+ }
+
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
@@ -125,10 +128,5 @@
}
};
}
-
- @Override
- public IOperatorDescriptor getOwner() {
- return MaterializingOperatorDescriptor.this;
- }
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index a12e49e..8a3addb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -18,10 +18,9 @@
import java.util.List;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
@@ -37,14 +36,13 @@
private class CollectActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- @Override
- public ActivityNodeId getActivityNodeId() {
- return id;
+ public CollectActivity(ActivityId id) {
+ super(id);
}
@Override
- public IOperatorDescriptor getOwner() {
- return SplitVectorOperatorDescriptor.this;
+ public ActivityId getActivityId() {
+ return id;
}
@Override
@@ -82,9 +80,8 @@
private class SplitActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- @Override
- public IOperatorDescriptor getOwner() {
- return SplitVectorOperatorDescriptor.this;
+ public SplitActivity(ActivityId id) {
+ super(id);
}
@Override
@@ -139,14 +136,14 @@
}
@Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- CollectActivity ca = new CollectActivity();
- SplitActivity sa = new SplitActivity();
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ CollectActivity ca = new CollectActivity(new ActivityId(odId, 0));
+ SplitActivity sa = new SplitActivity(new ActivityId(odId, 1));
- builder.addTask(ca);
+ builder.addActivity(ca);
builder.addSourceEdge(0, ca, 0);
- builder.addTask(sa);
+ builder.addActivity(sa);
builder.addTargetEdge(0, sa, 0);
builder.addBlockingEdge(ca, sa);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 6871452..3bde4c4 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -19,8 +19,8 @@
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -65,14 +65,14 @@
}
@Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- SortActivity sa = new SortActivity();
- MergeActivity ma = new MergeActivity();
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ SortActivity sa = new SortActivity(new ActivityId(odId, 0));
+ MergeActivity ma = new MergeActivity(new ActivityId(odId, 1));
- builder.addTask(sa);
+ builder.addActivity(sa);
builder.addSourceEdge(0, sa, 0);
- builder.addTask(ma);
+ builder.addActivity(ma);
builder.addTargetEdge(0, ma, 0);
builder.addBlockingEdge(sa, ma);
@@ -81,9 +81,8 @@
private class SortActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- @Override
- public IOperatorDescriptor getOwner() {
- return ExternalSortOperatorDescriptor.this;
+ public SortActivity(ActivityId id) {
+ super(id);
}
@Override
@@ -121,9 +120,8 @@
private class MergeActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- @Override
- public IOperatorDescriptor getOwner() {
- return ExternalSortOperatorDescriptor.this;
+ public MergeActivity(ActivityId id) {
+ super(id);
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index afa27ba..d3015b7 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -17,8 +17,8 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
@@ -56,14 +56,14 @@
}
@Override
- public void contributeTaskGraph(IActivityGraphBuilder builder) {
- SortActivity sa = new SortActivity();
- MergeActivity ma = new MergeActivity();
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ SortActivity sa = new SortActivity(new ActivityId(odId, 0));
+ MergeActivity ma = new MergeActivity(new ActivityId(odId, 1));
- builder.addTask(sa);
+ builder.addActivity(sa);
builder.addSourceEdge(0, sa, 0);
- builder.addTask(ma);
+ builder.addActivity(ma);
builder.addTargetEdge(0, ma, 0);
builder.addBlockingEdge(sa, ma);
@@ -72,9 +72,8 @@
private class SortActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- @Override
- public IOperatorDescriptor getOwner() {
- return InMemorySortOperatorDescriptor.this;
+ public SortActivity(ActivityId id) {
+ super(id);
}
@Override
@@ -110,9 +109,8 @@
private class MergeActivity extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- @Override
- public IOperatorDescriptor getOwner() {
- return InMemorySortOperatorDescriptor.this;
+ public MergeActivity(ActivityId id) {
+ super(id);
}
@Override
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
index 71c4a24..dde223b 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
@@ -20,7 +20,7 @@
import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
@@ -32,8 +32,8 @@
IHyracksRootContext rootCtx = new TestRootContext(frameSize);
INCApplicationContext appCtx = new TestNCApplicationContext(rootCtx);
IHyracksJobletContext jobletCtx = new TestJobletContext(appCtx, UUID.randomUUID());
- TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityNodeId(new OperatorDescriptorId(
- UUID.randomUUID()), UUID.randomUUID()), 0), 0);
+ TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(
+ new OperatorDescriptorId(UUID.randomUUID()), 0), 0), 0);
IHyracksTaskContext taskCtx = new TestTaskContext(jobletCtx, tid);
return taskCtx;
} catch (HyracksException e) {