Cleaned up JobActivityGraph
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@1635 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index e978ade..5aaf196 100644
--- a/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ b/hyracks-algebricks/hyracks-algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -119,7 +119,7 @@
RecordDescriptor pipelineOutputRecordDescriptor = outputArity > 0 ? AlgebricksMetaOperatorDescriptor.this.recordDescriptors[0]
: null;
RecordDescriptor pipelineInputRecordDescriptor = recordDescProvider.getInputRecordDescriptor(
- AlgebricksMetaOperatorDescriptor.this.getOperatorId(), 0);
+ AlgebricksMetaOperatorDescriptor.this.getActivityId(), 0);
PipelineAssembler pa = new PipelineAssembler(pipeline, inputArity, outputArity,
pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor);
try {
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
index 56870b2..bbda23c 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/IActivityGraphBuilder.java
@@ -15,7 +15,7 @@
package edu.uci.ics.hyracks.api.dataflow;
public interface IActivityGraphBuilder {
- public void addActivity(IActivity task);
+ public void addActivity(IOperatorDescriptor op, IActivity task);
public void addBlockingEdge(IActivity blocker, IActivity blocked);
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
index 96d43a7..c480446 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataflow/value/IRecordDescriptorProvider.java
@@ -14,10 +14,10 @@
*/
package edu.uci.ics.hyracks.api.dataflow.value;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
public interface IRecordDescriptorProvider {
- public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex);
+ public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex);
- public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex);
+ public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex);
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
index 2b6d361..3c6a547 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobActivityGraph.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.api.job;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -31,8 +30,7 @@
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
public class JobActivityGraph implements Serializable {
@@ -40,54 +38,63 @@
private final String appName;
- private final JobSpecification jobSpec;
-
private final EnumSet<JobFlag> jobFlags;
- private final Map<ActivityId, IActivity> activityNodes;
+ private final Map<ActivityId, IActivity> activityMap;
+
+ private final Map<ConnectorDescriptorId, IConnectorDescriptor> connectorMap;
+
+ private final Map<ConnectorDescriptorId, RecordDescriptor> connectorRecordDescriptorMap;
+
+ private final Map<ActivityId, List<IConnectorDescriptor>> activityInputMap;
+
+ private final Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap;
+
+ private final Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap;
private final Map<ActivityId, Set<ActivityId>> blocker2blockedMap;
private final Map<ActivityId, Set<ActivityId>> blocked2blockerMap;
- private final Map<OperatorDescriptorId, Set<ActivityId>> operatorActivityMap;
+ private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
- private final Map<ActivityId, List<Integer>> activityInputMap;
+ private int maxReattempts;
- private final Map<ActivityId, List<Integer>> activityOutputMap;
+ private IJobletEventListenerFactory jobletEventListenerFactory;
- private final Map<OperatorDescriptorId, List<ActivityId>> operatorInputMap;
+ private IGlobalJobDataFactory globalJobDataFactory;
- private final Map<OperatorDescriptorId, List<ActivityId>> operatorOutputMap;
-
- public JobActivityGraph(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+ public JobActivityGraph(String appName, EnumSet<JobFlag> jobFlags) {
this.appName = appName;
- this.jobSpec = jobSpec;
this.jobFlags = jobFlags;
- activityNodes = new HashMap<ActivityId, IActivity>();
+ activityMap = new HashMap<ActivityId, IActivity>();
+ connectorMap = new HashMap<ConnectorDescriptorId, IConnectorDescriptor>();
+ connectorRecordDescriptorMap = new HashMap<ConnectorDescriptorId, RecordDescriptor>();
+ activityInputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+ activityOutputMap = new HashMap<ActivityId, List<IConnectorDescriptor>>();
+ connectorActivityMap = new HashMap<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
blocker2blockedMap = new HashMap<ActivityId, Set<ActivityId>>();
blocked2blockerMap = new HashMap<ActivityId, Set<ActivityId>>();
- operatorActivityMap = new HashMap<OperatorDescriptorId, Set<ActivityId>>();
- activityInputMap = new HashMap<ActivityId, List<Integer>>();
- activityOutputMap = new HashMap<ActivityId, List<Integer>>();
- operatorInputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
- operatorOutputMap = new HashMap<OperatorDescriptorId, List<ActivityId>>();
}
public String getApplicationName() {
return appName;
}
- public JobSpecification getJobSpecification() {
- return jobSpec;
- }
-
public EnumSet<JobFlag> getJobFlags() {
return jobFlags;
}
- public Map<ActivityId, IActivity> getActivityNodeMap() {
- return activityNodes;
+ public Map<ActivityId, IActivity> getActivityMap() {
+ return activityMap;
+ }
+
+ public Map<ConnectorDescriptorId, IConnectorDescriptor> getConnectorMap() {
+ return connectorMap;
+ }
+
+ public Map<ConnectorDescriptorId, RecordDescriptor> getConnectorRecordDescriptorMap() {
+ return connectorRecordDescriptorMap;
}
public Map<ActivityId, Set<ActivityId>> getBlocker2BlockedMap() {
@@ -98,106 +105,64 @@
return blocked2blockerMap;
}
- public Map<OperatorDescriptorId, Set<ActivityId>> getOperatorActivityMap() {
- return operatorActivityMap;
- }
-
- public Map<ActivityId, List<Integer>> getActivityInputMap() {
+ public Map<ActivityId, List<IConnectorDescriptor>> getActivityInputMap() {
return activityInputMap;
}
- public Map<ActivityId, List<Integer>> getActivityOutputMap() {
+ public Map<ActivityId, List<IConnectorDescriptor>> getActivityOutputMap() {
return activityOutputMap;
}
- public Map<OperatorDescriptorId, List<ActivityId>> getOperatorInputMap() {
- return operatorInputMap;
- }
-
- public Map<OperatorDescriptorId, List<ActivityId>> getOperatorOutputMap() {
- return operatorOutputMap;
- }
-
- public List<IConnectorDescriptor> getActivityInputConnectorDescriptors(ActivityId hanId) {
- List<Integer> inputIndexes = activityInputMap.get(hanId);
- if (inputIndexes == null) {
- return null;
- }
- OperatorDescriptorId ownerId = hanId.getOperatorDescriptorId();
- List<IConnectorDescriptor> inputs = new ArrayList<IConnectorDescriptor>();
- for (Integer i : inputIndexes) {
- inputs.add(jobSpec.getInputConnectorDescriptor(ownerId, i));
- }
- return inputs;
- }
-
- public List<IConnectorDescriptor> getActivityOutputConnectorDescriptors(ActivityId hanId) {
- List<Integer> outputIndexes = activityOutputMap.get(hanId);
- if (outputIndexes == null) {
- return null;
- }
- OperatorDescriptorId ownerId = hanId.getOperatorDescriptorId();
- List<IConnectorDescriptor> outputs = new ArrayList<IConnectorDescriptor>();
- for (Integer i : outputIndexes) {
- outputs.add(jobSpec.getOutputConnectorDescriptor(ownerId, i));
- }
- return outputs;
+ public Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> getConnectorActivityMap() {
+ return connectorActivityMap;
}
public ActivityId getConsumerActivity(ConnectorDescriptorId cdId) {
- Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
- .getConnectorOperatorMap().get(cdId);
-
- OperatorDescriptorId consumerOpId = connEdge.getRight().getLeft().getOperatorId();
- int consumerInputIdx = connEdge.getRight().getRight();
-
- for (ActivityId anId : operatorActivityMap.get(consumerOpId)) {
- List<Integer> anInputs = activityInputMap.get(anId);
- if (anInputs != null) {
- for (Integer idx : anInputs) {
- if (idx.intValue() == consumerInputIdx) {
- return anId;
- }
- }
- }
- }
- return null;
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+ return connEdge.getRight().getLeft().getActivityId();
}
public ActivityId getProducerActivity(ConnectorDescriptorId cdId) {
- Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>> connEdge = jobSpec
- .getConnectorOperatorMap().get(cdId);
-
- OperatorDescriptorId producerOpId = connEdge.getLeft().getLeft().getOperatorId();
- int producerInputIdx = connEdge.getLeft().getRight();
-
- for (ActivityId anId : operatorActivityMap.get(producerOpId)) {
- List<Integer> anOutputs = activityOutputMap.get(anId);
- if (anOutputs != null) {
- for (Integer idx : anOutputs) {
- if (idx.intValue() == producerInputIdx) {
- return anId;
- }
- }
- }
- }
- return null;
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> connEdge = connectorActivityMap.get(cdId);
+ return connEdge.getLeft().getLeft().getActivityId();
}
- public RecordDescriptor getActivityInputRecordDescriptor(ActivityId hanId, int inputIndex) {
- int opInputIndex = getActivityInputMap().get(hanId).get(inputIndex);
- return jobSpec.getOperatorInputRecordDescriptor(hanId.getOperatorDescriptorId(), opInputIndex);
+ public IConnectorPolicyAssignmentPolicy getConnectorPolicyAssignmentPolicy() {
+ return connectorPolicyAssignmentPolicy;
}
- public RecordDescriptor getActivityOutputRecordDescriptor(ActivityId hanId, int outputIndex) {
- int opOutputIndex = getActivityOutputMap().get(hanId).get(outputIndex);
- return jobSpec.getOperatorOutputRecordDescriptor(hanId.getOperatorDescriptorId(), opOutputIndex);
+ public void setConnectorPolicyAssignmentPolicy(IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy) {
+ this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
+ }
+
+ public void setMaxReattempts(int maxReattempts) {
+ this.maxReattempts = maxReattempts;
+ }
+
+ public int getMaxReattempts() {
+ return maxReattempts;
+ }
+
+ public IJobletEventListenerFactory getJobletEventListenerFactory() {
+ return jobletEventListenerFactory;
+ }
+
+ public void setJobletEventListenerFactory(IJobletEventListenerFactory jobletEventListenerFactory) {
+ this.jobletEventListenerFactory = jobletEventListenerFactory;
+ }
+
+ public IGlobalJobDataFactory getGlobalJobDataFactory() {
+ return globalJobDataFactory;
+ }
+
+ public void setGlobalJobDataFactory(IGlobalJobDataFactory globalJobDataFactory) {
+ this.globalJobDataFactory = globalJobDataFactory;
}
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
- buffer.append("ActivityNodes: " + activityNodes);
+ buffer.append("ActivityNodes: " + activityMap);
buffer.append('\n');
buffer.append("Blocker->Blocked: " + blocker2blockedMap);
buffer.append('\n');
@@ -212,13 +177,12 @@
jplan.put("flags", jobFlags.toString());
JSONArray jans = new JSONArray();
- for (IActivity an : activityNodes.values()) {
+ for (IActivity an : activityMap.values()) {
JSONObject jan = new JSONObject();
jan.put("id", an.getActivityId().toString());
jan.put("java-class", an.getClass().getName());
- jan.put("operator-id", an.getActivityId().getOperatorDescriptorId().toString());
- List<IConnectorDescriptor> inputs = getActivityInputConnectorDescriptors(an.getActivityId());
+ List<IConnectorDescriptor> inputs = activityInputMap.get(an.getActivityId());
if (inputs != null) {
JSONArray jInputs = new JSONArray();
for (int i = 0; i < inputs.size(); ++i) {
@@ -230,7 +194,7 @@
jan.put("inputs", jInputs);
}
- List<IConnectorDescriptor> outputs = getActivityOutputConnectorDescriptors(an.getActivityId());
+ List<IConnectorDescriptor> outputs = activityOutputMap.get(an.getActivityId());
if (outputs != null) {
JSONArray jOutputs = new JSONArray();
for (int i = 0; i < outputs.size(); ++i) {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
index 1d96a71..a486c33 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/adminconsole/pages/JobDetailsPage.java
@@ -35,7 +35,6 @@
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork;
-import edu.uci.ics.hyracks.control.cc.work.GetJobSpecificationJSONWork;
public class JobDetailsPage extends AbstractPage {
private static final long serialVersionUID = 1L;
@@ -49,12 +48,6 @@
JobId jobId = JobId.parse(jobIdStr.toString());
- GetJobSpecificationJSONWork gjsw = new GetJobSpecificationJSONWork(ccs, jobId);
- ccs.getWorkQueue().scheduleAndSync(gjsw);
- Label jobspec = new Label("job-specification", gjsw.getJSON().toString());
- jobspec.setEscapeModelStrings(false);
- add(jobspec);
-
GetJobActivityGraphJSONWork gjagw = new GetJobActivityGraphJSONWork(ccs, jobId);
ccs.getWorkQueue().scheduleAndSync(gjagw);
Label jag = new Label("job-activity-graph", gjagw.getJSON().toString());
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index f5b74cf..d351d4a 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -71,4 +71,9 @@
public void setPlan(ActivityClusterPlan acp) {
this.acp = acp;
}
+
+ @Override
+ public String toString() {
+ return String.valueOf(activities);
+ }
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
index 6effd84..a7b60e4 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/JobActivityGraphBuilder.java
@@ -2,6 +2,7 @@
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -9,9 +10,14 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.commons.lang3.tuple.Pair;
+
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IActivityGraphBuilder;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobSpecification;
@@ -19,7 +25,32 @@
public class JobActivityGraphBuilder implements IActivityGraphBuilder {
private static final Logger LOGGER = Logger.getLogger(JobActivityGraphBuilder.class.getName());
- private JobActivityGraph jag;
+ private final Map<ActivityId, IOperatorDescriptor> activityOperatorMap;
+
+ private final JobActivityGraph jag;
+
+ private final JobSpecification jobSpec;
+
+ private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorProducerMap;
+
+ private final Map<ConnectorDescriptorId, Pair<IActivity, Integer>> connectorConsumerMap;
+
+ public JobActivityGraphBuilder(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
+ activityOperatorMap = new HashMap<ActivityId, IOperatorDescriptor>();
+ jag = new JobActivityGraph(appName, jobFlags);
+ this.jobSpec = jobSpec;
+ jag.setConnectorPolicyAssignmentPolicy(jobSpec.getConnectorPolicyAssignmentPolicy());
+ jag.setGlobalJobDataFactory(jobSpec.getGlobalJobDataFactory());
+ jag.setJobletEventListenerFactory(jobSpec.getJobletEventListenerFactory());
+ jag.setMaxReattempts(jobSpec.getMaxReattempts());
+ connectorProducerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
+ connectorConsumerMap = new HashMap<ConnectorDescriptorId, Pair<IActivity, Integer>>();
+ }
+
+ public void addConnector(IConnectorDescriptor conn) {
+ jag.getConnectorMap().put(conn.getConnectorId(), conn);
+ jag.getConnectorRecordDescriptorMap().put(conn.getConnectorId(), jobSpec.getConnectorRecordDescriptor(conn));
+ }
@Override
public void addBlockingEdge(IActivity blocker, IActivity blocked) {
@@ -33,9 +64,10 @@
LOGGER.finest("Adding source edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
+ operatorInputIndex + " -> " + task.getActivityId() + ":" + taskInputIndex);
}
- insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, operatorInputIndex);
- insertIntoIndexedMap(jag.getOperatorInputMap(), task.getActivityId().getOperatorDescriptorId(),
- operatorInputIndex, task.getActivityId());
+ IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
+ IConnectorDescriptor conn = jobSpec.getInputConnectorDescriptor(op, operatorInputIndex);
+ insertIntoIndexedMap(jag.getActivityInputMap(), task.getActivityId(), taskInputIndex, conn);
+ connectorConsumerMap.put(conn.getConnectorId(), Pair.of(task, taskInputIndex));
}
@Override
@@ -44,16 +76,28 @@
LOGGER.finest("Adding target edge: " + task.getActivityId().getOperatorDescriptorId() + ":"
+ operatorOutputIndex + " -> " + task.getActivityId() + ":" + taskOutputIndex);
}
- insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, operatorOutputIndex);
- insertIntoIndexedMap(jag.getOperatorOutputMap(), task.getActivityId().getOperatorDescriptorId(),
- operatorOutputIndex, task.getActivityId());
+ IOperatorDescriptor op = activityOperatorMap.get(task.getActivityId());
+ IConnectorDescriptor conn = jobSpec.getOutputConnectorDescriptor(op, operatorOutputIndex);
+ insertIntoIndexedMap(jag.getActivityOutputMap(), task.getActivityId(), taskOutputIndex, conn);
+ connectorProducerMap.put(conn.getConnectorId(), Pair.of(task, taskOutputIndex));
}
@Override
- public void addActivity(IActivity task) {
+ public void addActivity(IOperatorDescriptor op, IActivity task) {
+ activityOperatorMap.put(task.getActivityId(), op);
ActivityId activityId = task.getActivityId();
- jag.getActivityNodeMap().put(activityId, task);
- addToValueSet(jag.getOperatorActivityMap(), activityId.getOperatorDescriptorId(), activityId);
+ jag.getActivityMap().put(activityId, task);
+ }
+
+ public void finish() {
+ Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> caMap = jag
+ .getConnectorActivityMap();
+ for (Map.Entry<ConnectorDescriptorId, Pair<IActivity, Integer>> e : connectorProducerMap.entrySet()) {
+ ConnectorDescriptorId cdId = e.getKey();
+ Pair<IActivity, Integer> producer = e.getValue();
+ Pair<IActivity, Integer> consumer = connectorConsumerMap.get(cdId);
+ caMap.put(cdId, Pair.of(producer, consumer));
+ }
}
private <K, V> void addToValueSet(Map<K, Set<V>> map, K n1, V n2) {
@@ -72,10 +116,6 @@
}
}
- public void init(String appName, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags) {
- jag = new JobActivityGraph(appName, jobSpec, jobFlags);
- }
-
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int index, V value) {
List<V> vList = map.get(key);
if (vList == null) {
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
index 9d6aa71..9acc4b8 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterGraphBuilder.java
@@ -25,11 +25,8 @@
import org.apache.commons.lang3.tuple.Pair;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
-import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
import edu.uci.ics.hyracks.control.cc.job.ActivityClusterId;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -43,33 +40,22 @@
this.jobRun = jobRun;
}
- private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, JobSpecification spec,
- Set<ActivityCluster> eqSets) {
- Map<ActivityId, IActivity> activityNodeMap = jag.getActivityNodeMap();
+ private static Pair<ActivityId, ActivityId> findMergePair(JobActivityGraph jag, Set<ActivityCluster> eqSets) {
for (ActivityCluster eqSet : eqSets) {
for (ActivityId t : eqSet.getActivities()) {
- IActivity activity = activityNodeMap.get(t);
- List<Integer> inputList = jag.getActivityInputMap().get(t);
+ List<IConnectorDescriptor> inputList = jag.getActivityInputMap().get(t);
if (inputList != null) {
- for (Integer idx : inputList) {
- IConnectorDescriptor conn = spec.getInputConnectorDescriptor(activity.getActivityId()
- .getOperatorDescriptorId(), idx);
- OperatorDescriptorId producerId = spec.getProducer(conn).getOperatorId();
- int producerOutputIndex = spec.getProducerOutputIndex(conn);
- ActivityId inTask = jag.getOperatorOutputMap().get(producerId).get(producerOutputIndex);
+ for (IConnectorDescriptor conn : inputList) {
+ ActivityId inTask = jag.getProducerActivity(conn.getConnectorId());
if (!eqSet.getActivities().contains(inTask)) {
return Pair.<ActivityId, ActivityId> of(t, inTask);
}
}
}
- List<Integer> outputList = jag.getActivityOutputMap().get(t);
+ List<IConnectorDescriptor> outputList = jag.getActivityOutputMap().get(t);
if (outputList != null) {
- for (Integer idx : outputList) {
- IConnectorDescriptor conn = spec.getOutputConnectorDescriptor(activity.getActivityId()
- .getOperatorDescriptorId(), idx);
- OperatorDescriptorId consumerId = spec.getConsumer(conn).getOperatorId();
- int consumerInputIndex = spec.getConsumerInputIndex(conn);
- ActivityId outTask = jag.getOperatorInputMap().get(consumerId).get(consumerInputIndex);
+ for (IConnectorDescriptor conn : outputList) {
+ ActivityId outTask = jag.getConsumerActivity(conn.getConnectorId());
if (!eqSet.getActivities().contains(outTask)) {
return Pair.<ActivityId, ActivityId> of(t, outTask);
}
@@ -81,27 +67,23 @@
}
public Set<ActivityCluster> inferActivityClusters(JobActivityGraph jag) {
- JobSpecification spec = jag.getJobSpecification();
-
/*
* Build initial equivalence sets map. We create a map such that for each IOperatorTask, t -> { t }
*/
Map<ActivityId, ActivityCluster> stageMap = new HashMap<ActivityId, ActivityCluster>();
Set<ActivityCluster> stages = new HashSet<ActivityCluster>();
- for (Set<ActivityId> taskIds : jag.getOperatorActivityMap().values()) {
- for (ActivityId taskId : taskIds) {
- Set<ActivityId> eqSet = new HashSet<ActivityId>();
- eqSet.add(taskId);
- ActivityCluster stage = new ActivityCluster(eqSet);
- stageMap.put(taskId, stage);
- stages.add(stage);
- }
+ for (ActivityId taskId : jag.getActivityMap().keySet()) {
+ Set<ActivityId> eqSet = new HashSet<ActivityId>();
+ eqSet.add(taskId);
+ ActivityCluster stage = new ActivityCluster(eqSet);
+ stageMap.put(taskId, stage);
+ stages.add(stage);
}
boolean changed = true;
while (changed) {
changed = false;
- Pair<ActivityId, ActivityId> pair = findMergePair(jag, spec, stages);
+ Pair<ActivityId, ActivityId> pair = findMergePair(jag, stages);
if (pair != null) {
merge(stageMap, stages, pair.getLeft(), pair.getRight());
changed = true;
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
index d04bf81..23f53f4 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/ActivityClusterPlanner.java
@@ -184,7 +184,7 @@
for (ActivityId ac1 : activities) {
Task[] ac1TaskStates = activityPlanMap.get(ac1).getTasks();
int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+ List<IConnectorDescriptor> outputConns = jag.getActivityOutputMap().get(ac1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
@@ -334,7 +334,7 @@
for (ActivityId a1 : activities) {
Task[] ac1TaskStates = taskMap.get(a1).getTasks();
int nProducers = ac1TaskStates.length;
- List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(a1);
+ List<IConnectorDescriptor> outputConns = jag.getActivityOutputMap().get(a1);
if (outputConns != null) {
for (IConnectorDescriptor c : outputConns) {
ConnectorDescriptorId cdId = c.getConnectorId();
@@ -356,7 +356,7 @@
}
private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
- IConnectorPolicyAssignmentPolicy cpap = scheduler.getJobRun().getJobActivityGraph().getJobSpecification()
+ IConnectorPolicyAssignmentPolicy cpap = scheduler.getJobRun().getJobActivityGraph()
.getConnectorPolicyAssignmentPolicy();
if (cpap != null) {
return cpap.getConnectorPolicyAssignment(c, nProducers, nConsumers, fanouts);
@@ -368,6 +368,7 @@
throws HyracksException {
PartitionConstraintSolver solver = scheduler.getSolver();
JobRun jobRun = scheduler.getJobRun();
+ JobActivityGraph jag = jobRun.getJobActivityGraph();
Set<LValueConstraintExpression> lValues = new HashSet<LValueConstraintExpression>();
for (ActivityId anId : ac.getActivities()) {
lValues.add(new PartitionCountExpression(anId.getOperatorDescriptorId()));
@@ -393,22 +394,25 @@
for (ActivityId anId : ac.getActivities()) {
int nParts = nPartMap.get(anId.getOperatorDescriptorId());
int[] nInputPartitions = null;
- List<IConnectorDescriptor> inputs = jobRun.getJobActivityGraph().getActivityInputConnectorDescriptors(anId);
+ List<IConnectorDescriptor> inputs = jag.getActivityInputMap().get(anId);
if (inputs != null) {
nInputPartitions = new int[inputs.size()];
for (int i = 0; i < nInputPartitions.length; ++i) {
- nInputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
- .getProducerActivity(inputs.get(i).getConnectorId()).getOperatorDescriptorId());
+ ConnectorDescriptorId cdId = inputs.get(i).getConnectorId();
+ ActivityId aid = jag.getProducerActivity(cdId);
+ Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
+ nInputPartitions[i] = nPartInt;
}
}
int[] nOutputPartitions = null;
- List<IConnectorDescriptor> outputs = jobRun.getJobActivityGraph().getActivityOutputConnectorDescriptors(
- anId);
+ List<IConnectorDescriptor> outputs = jag.getActivityOutputMap().get(anId);
if (outputs != null) {
nOutputPartitions = new int[outputs.size()];
for (int i = 0; i < nOutputPartitions.length; ++i) {
- nOutputPartitions[i] = nPartMap.get(jobRun.getJobActivityGraph()
- .getConsumerActivity(outputs.get(i).getConnectorId()).getOperatorDescriptorId());
+ ConnectorDescriptorId cdId = outputs.get(i).getConnectorId();
+ ActivityId aid = jag.getConsumerActivity(cdId);
+ Integer nPartInt = nPartMap.get(aid.getOperatorDescriptorId());
+ nOutputPartitions[i] = nPartInt;
}
}
ActivityPartitionDetails apd = new ActivityPartitionDetails(nParts, nInputPartitions, nOutputPartitions);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
index 4a315ab..b3a76ad 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/JobScheduler.java
@@ -15,6 +15,7 @@
package edu.uci.ics.hyracks.control.cc.scheduler;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -25,32 +26,24 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import edu.uci.ics.hyracks.api.application.ICCApplicationContext;
import edu.uci.ics.hyracks.api.constraints.Constraint;
-import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
-import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.api.util.JavaSerializationUtils;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
import edu.uci.ics.hyracks.control.cc.job.ActivityCluster;
-import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
-import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.cc.job.PlanUtils;
import edu.uci.ics.hyracks.control.cc.job.Task;
import edu.uci.ics.hyracks.control.cc.job.TaskAttempt;
import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
@@ -75,12 +68,15 @@
private Set<ActivityCluster> rootActivityClusters;
- public JobScheduler(ClusterControllerService ccs, JobRun jobRun) {
+ public JobScheduler(ClusterControllerService ccs, JobRun jobRun, Collection<Constraint> constraints) {
this.ccs = ccs;
this.jobRun = jobRun;
solver = new PartitionConstraintSolver();
partitionProducingTaskClusterMap = new HashMap<PartitionId, TaskCluster>();
inProgressTaskClusters = new HashSet<TaskCluster>();
+ solver.addConstraints(constraints);
+ ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
+ rootActivityClusters = acgb.inferActivityClusters(jobRun.getJobActivityGraph());
}
public JobRun getJobRun() {
@@ -92,43 +88,9 @@
}
public void startJob() throws HyracksException {
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Starting Job: " + jobRun.getJobActivityGraph().getJobSpecification());
- }
- analyze();
startRunnableActivityClusters();
}
- private void analyze() throws HyracksException {
- final JobActivityGraph jag = jobRun.getJobActivityGraph();
- final ICCApplicationContext appCtx = ccs.getApplicationMap().get(jag.getApplicationName());
- JobSpecification spec = jag.getJobSpecification();
- final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
- final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
- @Override
- public void addConstraint(Constraint constraint) {
- contributedConstraints.add(constraint);
- }
- };
- PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
- @Override
- public void visit(IOperatorDescriptor op) {
- op.contributeSchedulingConstraints(acceptor, jag, appCtx);
- }
- });
- PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
- @Override
- public void visit(IConnectorDescriptor conn) {
- conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
- }
- });
- contributedConstraints.addAll(spec.getUserConstraints());
- solver.addConstraints(contributedConstraints);
-
- ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder(jobRun);
- rootActivityClusters = acgb.inferActivityClusters(jag);
- }
-
private void findRunnableTaskClusterRoots(Set<TaskCluster> frontier, Set<ActivityCluster> roots)
throws HyracksException {
for (ActivityCluster root : roots) {
@@ -644,7 +606,7 @@
lastAttempt.setStatus(TaskClusterAttempt.TaskClusterStatus.FAILED);
lastAttempt.setEndTime(System.currentTimeMillis());
abortDoomedTaskClusters();
- if (lastAttempt.getAttempt() >= jobRun.getJobActivityGraph().getJobSpecification().getMaxReattempts()) {
+ if (lastAttempt.getAttempt() >= jobRun.getJobActivityGraph().getMaxReattempts()) {
abortJob(new HyracksException(details));
return;
}
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
index 4135b18..c964e4e 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/web/JobsRESTAPIFunction.java
@@ -21,7 +21,6 @@
import edu.uci.ics.hyracks.control.cc.web.util.IJSONOutputFunction;
import edu.uci.ics.hyracks.control.cc.work.GetJobActivityGraphJSONWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobRunJSONWork;
-import edu.uci.ics.hyracks.control.cc.work.GetJobSpecificationJSONWork;
import edu.uci.ics.hyracks.control.cc.work.GetJobSummariesJSONWork;
public class JobsRESTAPIFunction implements IJSONOutputFunction {
@@ -49,11 +48,7 @@
case 2: {
JobId jobId = JobId.parse(arguments[0]);
- if ("job-specification".equalsIgnoreCase(arguments[1])) {
- GetJobSpecificationJSONWork gjse = new GetJobSpecificationJSONWork(ccs, jobId);
- ccs.getWorkQueue().scheduleAndSync(gjse);
- result.put("result", gjse.getJSON());
- } else if ("job-activity-graph".equalsIgnoreCase(arguments[1])) {
+ if ("job-activity-graph".equalsIgnoreCase(arguments[1])) {
GetJobActivityGraphJSONWork gjage = new GetJobActivityGraphJSONWork(ccs, jobId);
ccs.getWorkQueue().scheduleAndSync(gjage);
result.put("result", gjage.getJSON());
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java
deleted file mode 100644
index 05b7cae..0000000
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetJobSpecificationJSONWork.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import org.json.JSONObject;
-
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.cc.job.JobRun;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetJobSpecificationJSONWork extends SynchronizableWork {
- private final ClusterControllerService ccs;
- private final JobId jobId;
- private JSONObject json;
-
- public GetJobSpecificationJSONWork(ClusterControllerService ccs, JobId jobId) {
- this.ccs = ccs;
- this.jobId = jobId;
- }
-
- @Override
- protected void doRun() throws Exception {
- JobRun run = ccs.getActiveRunMap().get(jobId);
- if (run == null) {
- run = ccs.getRunMapArchive().get(jobId);
- if (run == null) {
- json = new JSONObject();
- return;
- }
- }
- json = run.getJobActivityGraph().getJobSpecification().toJSON();
- }
-
- public JSONObject getJSON() {
- return json;
- }
-}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
index 07088bb..8d8e85d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCleanupWork.java
@@ -17,10 +17,16 @@
import java.util.Set;
import java.util.logging.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.job.JobActivityGraph;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.NodeControllerState;
+import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
@@ -53,13 +59,44 @@
Set<String> targetNodes = run.getParticipatingNodeIds();
run.getCleanupPendingNodeIds().addAll(targetNodes);
run.setPendingStatus(status, exception);
- for (String n : targetNodes) {
- NodeControllerState ncs = ccs.getNodeMap().get(n);
+ if (targetNodes != null && !targetNodes.isEmpty()) {
+ for (String n : targetNodes) {
+ NodeControllerState ncs = ccs.getNodeMap().get(n);
+ try {
+ ncs.getNodeController().cleanUpJoblet(jobId, status);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ } else {
+ CCApplicationContext appCtx = ccs.getApplicationMap().get(run.getJobActivityGraph().getApplicationName());
+ if (appCtx != null) {
+ try {
+ appCtx.notifyJobFinish(jobId);
+ } catch (HyracksException e) {
+ e.printStackTrace();
+ }
+ }
+ run.setStatus(run.getPendingStatus(), run.getPendingException());
+ ccs.getActiveRunMap().remove(jobId);
+ ccs.getRunMapArchive().put(jobId, run);
try {
- ncs.getNodeController().cleanUpJoblet(jobId, status);
+ ccs.getJobLogFile().log(createJobLogObject(run));
} catch (Exception e) {
- e.printStackTrace();
+ throw new RuntimeException(e);
}
}
}
+
+ private JSONObject createJobLogObject(final JobRun run) {
+ JSONObject jobLogObject = new JSONObject();
+ try {
+ JobActivityGraph jag = run.getJobActivityGraph();
+ jobLogObject.put("job-activity-graph", jag.toJSON());
+ jobLogObject.put("job-run", run.toJSON());
+ } catch (JSONException e) {
+ throw new RuntimeException(e);
+ }
+ return jobLogObject;
+ }
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
index b7cf629..ea8cef3 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobCreateWork.java
@@ -15,7 +15,12 @@
package edu.uci.ics.hyracks.control.cc.work;
import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+import edu.uci.ics.hyracks.api.constraints.Constraint;
+import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
@@ -25,6 +30,7 @@
import edu.uci.ics.hyracks.api.job.JobStatus;
import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
+import edu.uci.ics.hyracks.control.cc.job.IConnectorDescriptorVisitor;
import edu.uci.ics.hyracks.control.cc.job.IOperatorDescriptorVisitor;
import edu.uci.ics.hyracks.control.cc.job.JobActivityGraphBuilder;
import edu.uci.ics.hyracks.control.cc.job.JobRun;
@@ -54,20 +60,26 @@
@Override
protected void doRun() throws Exception {
try {
- CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
+ final CCApplicationContext appCtx = ccs.getApplicationMap().get(appName);
if (appCtx == null) {
throw new HyracksException("No application with id " + appName + " found");
}
JobSpecification spec = appCtx.createJobSpecification(jobSpec);
- final JobActivityGraphBuilder builder = new JobActivityGraphBuilder();
- builder.init(appName, spec, jobFlags);
+ final JobActivityGraphBuilder builder = new JobActivityGraphBuilder(appName, spec, jobFlags);
+ PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+ @Override
+ public void visit(IConnectorDescriptor conn) throws HyracksException {
+ builder.addConnector(conn);
+ }
+ });
PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
@Override
public void visit(IOperatorDescriptor op) {
op.contributeActivities(builder);
}
});
+ builder.finish();
final JobActivityGraph jag = builder.getActivityGraph();
JobRun run = new JobRun(jobId, jag);
@@ -75,7 +87,28 @@
run.setStatus(JobStatus.INITIALIZED, null);
ccs.getActiveRunMap().put(jobId, run);
- JobScheduler jrs = new JobScheduler(ccs, run);
+ final Set<Constraint> contributedConstraints = new HashSet<Constraint>();
+ final IConstraintAcceptor acceptor = new IConstraintAcceptor() {
+ @Override
+ public void addConstraint(Constraint constraint) {
+ contributedConstraints.add(constraint);
+ }
+ };
+ PlanUtils.visit(spec, new IOperatorDescriptorVisitor() {
+ @Override
+ public void visit(IOperatorDescriptor op) {
+ op.contributeSchedulingConstraints(acceptor, jag, appCtx);
+ }
+ });
+ PlanUtils.visit(spec, new IConnectorDescriptorVisitor() {
+ @Override
+ public void visit(IConnectorDescriptor conn) {
+ conn.contributeSchedulingConstraints(acceptor, jag, appCtx);
+ }
+ });
+ contributedConstraints.addAll(spec.getUserConstraints());
+
+ JobScheduler jrs = new JobScheduler(ccs, run, contributedConstraints);
run.setScheduler(jrs);
appCtx.notifyJobCreation(jobId, spec);
callback.setValue(jobId);
diff --git a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 80ab5ec..dea4a8d 100644
--- a/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ b/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -83,7 +83,6 @@
JSONObject jobLogObject = new JSONObject();
try {
JobActivityGraph jag = run.getJobActivityGraph();
- jobLogObject.put("job-specification", jag.getJobSpecification().toJSON());
jobLogObject.put("job-activity-graph", jag.toJSON());
jobLogObject.put("job-run", run.toJSON());
} catch (JSONException e) {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index ec7cf89..10c9c3d 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -97,7 +97,7 @@
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(this, (IOManager) appCtx.getRootContext().getIOManager());
cleanupPending = false;
- IJobletEventListenerFactory jelf = jag.getJobSpecification().getJobletEventListenerFactory();
+ IJobletEventListenerFactory jelf = jag.getJobletEventListenerFactory();
if (jelf != null) {
IJobletEventListener listener = jelf.createListener(this);
this.jobletEventListener = listener;
@@ -105,7 +105,7 @@
} else {
jobletEventListener = null;
}
- IGlobalJobDataFactory gjdf = jag.getJobSpecification().getGlobalJobDataFactory();
+ IGlobalJobDataFactory gjdf = jag.getGlobalJobDataFactory();
globalJobData = gjdf != null ? gjdf.createGlobalJobData(this) : null;
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 78101bf..8d1b2a6 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -26,11 +26,11 @@
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.IPartitionWriterFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.IActivity;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
-import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.TaskId;
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
@@ -90,20 +90,22 @@
IRecordDescriptorProvider rdp = new IRecordDescriptorProvider() {
@Override
- public RecordDescriptor getOutputRecordDescriptor(OperatorDescriptorId opId, int outputIndex) {
- return jag.getJobSpecification().getOperatorOutputRecordDescriptor(opId, outputIndex);
+ public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
+ IConnectorDescriptor conn = jag.getActivityOutputMap().get(aid).get(outputIndex);
+ return jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
}
@Override
- public RecordDescriptor getInputRecordDescriptor(OperatorDescriptorId opId, int inputIndex) {
- return jag.getJobSpecification().getOperatorInputRecordDescriptor(opId, inputIndex);
+ public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
+ IConnectorDescriptor conn = jag.getActivityInputMap().get(aid).get(inputIndex);
+ return jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
}
};
for (TaskAttemptDescriptor td : taskDescriptors) {
TaskAttemptId taId = td.getTaskAttemptId();
TaskId tid = taId.getTaskId();
- IActivity han = jag.getActivityNodeMap().get(tid.getActivityId());
+ IActivity han = jag.getActivityMap().get(tid.getActivityId());
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Initializing " + taId + " -> " + han);
}
@@ -113,7 +115,7 @@
List<IPartitionCollector> collectors = new ArrayList<IPartitionCollector>();
- List<IConnectorDescriptor> inputs = jag.getActivityInputConnectorDescriptors(tid.getActivityId());
+ List<IConnectorDescriptor> inputs = jag.getActivityInputMap().get(tid.getActivityId());
if (inputs != null) {
for (int i = 0; i < inputs.size(); ++i) {
IConnectorDescriptor conn = inputs.get(i);
@@ -121,17 +123,17 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("input: " + i + ": " + conn.getConnectorId());
}
- RecordDescriptor recordDesc = jag.getJobSpecification().getConnectorRecordDescriptor(conn);
+ RecordDescriptor recordDesc = jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
IPartitionCollector collector = createPartitionCollector(td, partition, task, i, conn,
recordDesc, cPolicy);
collectors.add(collector);
}
}
- List<IConnectorDescriptor> outputs = jag.getActivityOutputConnectorDescriptors(tid.getActivityId());
+ List<IConnectorDescriptor> outputs = jag.getActivityOutputMap().get(tid.getActivityId());
if (outputs != null) {
for (int i = 0; i < outputs.size(); ++i) {
final IConnectorDescriptor conn = outputs.get(i);
- RecordDescriptor recordDesc = jag.getJobSpecification().getConnectorRecordDescriptor(conn);
+ RecordDescriptor recordDesc = jag.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
IPartitionWriterFactory pwFactory = createPartitionWriterFactory(task, cPolicy, jobId, conn,
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
index c4156f4..34f3e9d 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopMapperOperatorDescriptor.java
@@ -230,9 +230,7 @@
data[1] = value;
writer.writeData(data);
}
- };
- ;
- ;
+ };;;
OutputCommitter outputCommitter = new org.apache.hadoop.mapreduce.lib.output.NullOutputFormat()
.getOutputCommitter(new TaskAttemptContext(conf, new TaskAttemptID()));
@@ -254,9 +252,7 @@
public Counter getCounter(Enum<?> arg0) {
return null;
}
- };
- ;
- ;
+ };;;
context = new org.apache.hadoop.mapreduce.Mapper().new Context(conf, new TaskAttemptID(),
newReader, recordWriter, outputCommitter, statusReporter,
(org.apache.hadoop.mapreduce.InputSplit) inputSplit);
@@ -308,8 +304,8 @@
inputSplitsProxy = new InputSplitsProxy(jobConf, splits);
}
- public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf, IHadoopClassFactory hadoopClassFactory)
- throws IOException {
+ public HadoopMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf jobConf,
+ IHadoopClassFactory hadoopClassFactory) throws IOException {
super(spec, 1, getRecordDescriptor(jobConf, hadoopClassFactory), jobConf, hadoopClassFactory);
}
@@ -412,7 +408,7 @@
return createSelfReadingMapper(ctx, recordDescriptor, partition);
} else {
return new DeserializedOperatorNodePushable(ctx, new MapperOperator(partition),
- recordDescProvider.getInputRecordDescriptor(this.odId, 0));
+ recordDescProvider.getInputRecordDescriptor(this.activityNodeId, 0));
}
} catch (Exception e) {
throw new HyracksDataException(e);
diff --git a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
index 35df8de..86a4c3c 100644
--- a/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
+++ b/hyracks-dataflow-hadoop/src/main/java/edu/uci/ics/hyracks/dataflow/hadoop/HadoopReducerOperatorDescriptor.java
@@ -311,8 +311,8 @@
private IComparatorFactory comparatorFactory;
private boolean useAsCombiner = false;
- public HadoopReducerOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf conf, IComparatorFactory comparatorFactory,
- IHadoopClassFactory classFactory, boolean useAsCombiner) {
+ public HadoopReducerOperatorDescriptor(IOperatorDescriptorRegistry spec, JobConf conf,
+ IComparatorFactory comparatorFactory, IHadoopClassFactory classFactory, boolean useAsCombiner) {
super(spec, 1, getRecordDescriptor(conf, classFactory), conf, classFactory);
this.comparatorFactory = comparatorFactory;
this.useAsCombiner = useAsCombiner;
@@ -371,7 +371,7 @@
IOpenableDataWriterOperator op = new DeserializedPreclusteredGroupOperator(new int[] { 0 },
new IComparator[] { comparatorFactory.createComparator() }, new ReducerAggregator(createReducer()));
return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0));
+ getActivityId(), 0));
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
index dd9e6ad..9048670 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractSingleActivityOperatorDescriptor.java
@@ -36,7 +36,7 @@
@Override
public final void contributeActivities(IActivityGraphBuilder builder) {
- builder.addActivity(this);
+ builder.addActivity(this, this);
for (int i = 0; i < getInputArity(); ++i) {
builder.addSourceEdge(i, this, i);
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
index 1d6b69d..a69fd01 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/OneToOneConnectorDescriptor.java
@@ -24,12 +24,11 @@
import edu.uci.ics.hyracks.api.constraints.IConstraintAcceptor;
import edu.uci.ics.hyracks.api.constraints.expressions.PartitionCountExpression;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobActivityGraph;
-import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicChannelReader;
import edu.uci.ics.hyracks.dataflow.std.collectors.NonDeterministicFrameReader;
@@ -63,12 +62,11 @@
@Override
public void contributeSchedulingConstraints(IConstraintAcceptor constraintAcceptor, JobActivityGraph plan,
ICCApplicationContext appCtx) {
- JobSpecification jobSpec = plan.getJobSpecification();
- IOperatorDescriptor consumer = jobSpec.getConsumer(this);
- IOperatorDescriptor producer = jobSpec.getProducer(this);
+ OperatorDescriptorId consumer = plan.getConsumerActivity(getConnectorId()).getOperatorDescriptorId();
+ OperatorDescriptorId producer = plan.getProducerActivity(getConnectorId()).getOperatorDescriptorId();
- constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer.getOperatorId()),
- new PartitionCountExpression(producer.getOperatorId())));
+ constraintAcceptor.addConstraint(new Constraint(new PartitionCountExpression(consumer),
+ new PartitionCountExpression(producer)));
}
@Override
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
index 584d398..e05d80d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/AbstractFileWriteOperatorDescriptor.java
@@ -91,6 +91,6 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new FileWriteOperator(partition),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
index 2a205cc..6fb0189 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/PlainFileWriterOperatorDescriptor.java
@@ -50,7 +50,8 @@
* @param inputArity
* @param outputArity
*/
- public PlainFileWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvider, String delim) {
+ public PlainFileWriterOperatorDescriptor(IOperatorDescriptorRegistry spec, IFileSplitProvider fileSplitProvider,
+ String delim) {
super(spec, 1, 0);
this.fileSplitProvider = fileSplitProvider;
this.delim = delim;
@@ -74,9 +75,9 @@
final FileSplit[] splits = fileSplitProvider.getFileSplits();
// Frame accessor
final FrameTupleAccessor frameTupleAccessor = new FrameTupleAccessor(ctx.getFrameSize(),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
// Record descriptor
- final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ final RecordDescriptor recordDescriptor = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new AbstractUnaryInputSinkOperatorNodePushable() {
private BufferedWriter out;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index 9e64ebb..9fd45e9 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -91,10 +91,10 @@
AggregateActivity aggregateAct = new AggregateActivity(new ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
MergeActivity mergeAct = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
- builder.addActivity(aggregateAct);
+ builder.addActivity(this, aggregateAct);
builder.addSourceEdge(0, aggregateAct, 0);
- builder.addActivity(mergeAct);
+ builder.addActivity(this, mergeAct);
builder.addTargetEdge(0, mergeAct, 0);
builder.addBlockingEdge(aggregateAct, mergeAct);
@@ -113,7 +113,7 @@
throws HyracksDataException {
return new ExternalGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keyFields,
framesLimit, comparatorFactories, firstNormalizerFactory, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0],
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0],
spillableTableFactory);
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
index 91f80a7..034e13f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/hash/HashGroupOperatorDescriptor.java
@@ -47,9 +47,9 @@
private final int tableSize;
- public HashGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys, ITuplePartitionComputerFactory tpcf,
- IBinaryComparatorFactory[] comparatorFactories, IAggregatorDescriptorFactory aggregatorFactory,
- RecordDescriptor outRecordDescriptor, int tableSize) {
+ public HashGroupOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys,
+ ITuplePartitionComputerFactory tpcf, IBinaryComparatorFactory[] comparatorFactories,
+ IAggregatorDescriptorFactory aggregatorFactory, RecordDescriptor outRecordDescriptor, int tableSize) {
super(spec, 1, 1);
this.keys = keys;
this.tpcf = tpcf;
@@ -69,10 +69,10 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
HashBuildActivity ha = new HashBuildActivity(new ActivityId(odId, HASH_BUILD_ACTIVITY_ID));
- builder.addActivity(ha);
+ builder.addActivity(this, ha);
OutputActivity oa = new OutputActivity(new ActivityId(odId, OUTPUT_ACTIVITY_ID));
- builder.addActivity(oa);
+ builder.addActivity(this, oa);
builder.addSourceEdge(0, ha, 0);
builder.addTargetEdge(0, oa, 0);
@@ -91,7 +91,7 @@
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
return new HashGroupBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition), keys, tpcf,
comparatorFactories, aggregatorFactory, tableSize, recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0), recordDescriptors[0]);
+ getActivityId(), 0), recordDescriptors[0]);
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
index 9df5e44..d37b818 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/preclustered/PreclusteredGroupOperatorDescriptor.java
@@ -14,7 +14,6 @@
*/
package edu.uci.ics.hyracks.dataflow.std.group.preclustered;
-
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -47,6 +46,6 @@
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
return new PreclusteredGroupOperatorNodePushable(ctx, groupFields, comparatorFactories, aggregatorFactory,
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0), recordDescriptors[0]);
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0), recordDescriptors[0]);
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index ef5cb94..6c58d6f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -45,9 +45,10 @@
private final boolean isLeftOuter;
private final INullWriterFactory[] nullWriterFactories1;
- public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
- double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor) {
+ public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+ int recordsPerFrame, double factor, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
@@ -62,10 +63,10 @@
recordDescriptors[0] = recordDescriptor;
}
- public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
- double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
- INullWriterFactory[] nullWriterFactories1) {
+ public GraceHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+ int recordsPerFrame, double factor, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1) {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
@@ -82,19 +83,19 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- HashPartitionActivityNode rpart = new HashPartitionActivityNode(new ActivityId(odId, RPARTITION_ACTIVITY_ID),
- keys0, 0);
- HashPartitionActivityNode spart = new HashPartitionActivityNode(new ActivityId(odId, SPARTITION_ACTIVITY_ID),
- keys1, 1);
- JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, JOIN_ACTIVITY_ID));
+ ActivityId rpartAid = new ActivityId(odId, RPARTITION_ACTIVITY_ID);
+ HashPartitionActivityNode rpart = new HashPartitionActivityNode(rpartAid, keys0);
+ ActivityId spartAid = new ActivityId(odId, SPARTITION_ACTIVITY_ID);
+ HashPartitionActivityNode spart = new HashPartitionActivityNode(spartAid, keys1);
+ JoinActivityNode join = new JoinActivityNode(new ActivityId(odId, JOIN_ACTIVITY_ID), rpartAid, spartAid);
- builder.addActivity(rpart);
+ builder.addActivity(this, rpart);
builder.addSourceEdge(0, rpart, 0);
- builder.addActivity(spart);
+ builder.addActivity(this, spart);
builder.addSourceEdge(1, spart, 0);
- builder.addActivity(join);
+ builder.addActivity(this, join);
builder.addBlockingEdge(rpart, spart);
builder.addBlockingEdge(spart, join);
@@ -107,13 +108,11 @@
private class HashPartitionActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- private int operatorInputIndex;
private int keys[];
- public HashPartitionActivityNode(ActivityId id, int keys[], int operatorInputIndex) {
+ public HashPartitionActivityNode(ActivityId id, int keys[]) {
super(id);
this.keys = keys;
- this.operatorInputIndex = operatorInputIndex;
}
@Override
@@ -121,23 +120,28 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new GraceHashJoinPartitionBuildOperatorNodePushable(ctx, new TaskId(getActivityId(), partition),
keys, hashFunctionFactories, comparatorFactories, (int) Math.ceil(Math.sqrt(inputsize0 * factor
- / nPartitions)), recordDescProvider.getInputRecordDescriptor(getOperatorId(),
- operatorInputIndex));
+ / nPartitions)), recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
}
}
private class JoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public JoinActivityNode(ActivityId id) {
+ private final ActivityId rpartAid;
+
+ private final ActivityId spartAid;
+
+ public JoinActivityNode(ActivityId id, ActivityId rpartAid, ActivityId spartAid) {
super(id);
+ this.rpartAid = rpartAid;
+ this.spartAid = spartAid;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(rpartAid, 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(spartAid, 0);
int numPartitions = (int) Math.ceil(Math.sqrt(inputsize0 * factor / nPartitions));
return new GraceHashJoinOperatorNodePushable(ctx, new TaskId(new ActivityId(getOperatorId(),
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 1eaf3bf..4f9b987 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -84,10 +84,10 @@
* @param recordDescriptor
* @throws HyracksDataException
*/
- public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
- double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor)
- throws HyracksDataException {
+ public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+ int recordsPerFrame, double factor, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor) throws HyracksDataException {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
@@ -102,10 +102,11 @@
recordDescriptors[0] = recordDescriptor;
}
- public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0, int recordsPerFrame,
- double factor, int[] keys0, int[] keys1, IBinaryHashFunctionFactory[] hashFunctionFactories,
- IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor, boolean isLeftOuter,
- INullWriterFactory[] nullWriterFactories1) throws HyracksDataException {
+ public HybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, int inputsize0,
+ int recordsPerFrame, double factor, int[] keys0, int[] keys1,
+ IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
+ RecordDescriptor recordDescriptor, boolean isLeftOuter, INullWriterFactory[] nullWriterFactories1)
+ throws HyracksDataException {
super(spec, 2, 1);
this.memsize = memsize;
this.inputsize0 = inputsize0;
@@ -122,15 +123,15 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(new ActivityId(odId,
- BUILD_AND_PARTITION_ACTIVITY_ID));
- PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(new ActivityId(odId,
- PARTITION_AND_JOIN_ACTIVITY_ID));
+ ActivityId p1Aid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
+ ActivityId p2Aid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
+ BuildAndPartitionActivityNode phase1 = new BuildAndPartitionActivityNode(p1Aid, p2Aid);
+ PartitionAndJoinActivityNode phase2 = new PartitionAndJoinActivityNode(p2Aid, p1Aid);
- builder.addActivity(phase1);
+ builder.addActivity(this, phase1);
builder.addSourceEdge(1, phase1, 0);
- builder.addActivity(phase2);
+ builder.addActivity(this, phase2);
builder.addSourceEdge(0, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
@@ -166,15 +167,18 @@
private class BuildAndPartitionActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public BuildAndPartitionActivityNode(ActivityId id) {
+ private final ActivityId joinAid;
+
+ public BuildAndPartitionActivityNode(ActivityId id, ActivityId joinAid) {
super(id);
+ this.joinAid = joinAid;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -351,15 +355,18 @@
private class PartitionAndJoinActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public PartitionAndJoinActivityNode(ActivityId id) {
+ private final ActivityId buildAid;
+
+ public PartitionAndJoinActivityNode(ActivityId id, ActivityId buildAid) {
super(id);
+ this.buildAid = buildAid;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index ddbe417..e0a5613 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -48,9 +48,6 @@
import edu.uci.ics.hyracks.dataflow.std.structures.SerializableHashTable;
public class InMemoryHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
- private static final int BUILD_ACTIVITY_ID = 0;
- private static final int PROBE_ACTIVITY_ID = 1;
-
private static final long serialVersionUID = 1L;
private final int[] keys0;
private final int[] keys1;
@@ -91,13 +88,15 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- HashBuildActivityNode hba = new HashBuildActivityNode(new ActivityId(odId, 0));
- HashProbeActivityNode hpa = new HashProbeActivityNode(new ActivityId(odId, 1));
+ ActivityId hbaId = new ActivityId(odId, 0);
+ ActivityId hpaId = new ActivityId(odId, 1);
+ HashBuildActivityNode hba = new HashBuildActivityNode(hbaId, hpaId);
+ HashProbeActivityNode hpa = new HashProbeActivityNode(hpaId);
- builder.addActivity(hba);
+ builder.addActivity(this, hba);
builder.addSourceEdge(1, hba, 0);
- builder.addActivity(hpa);
+ builder.addActivity(this, hpa);
builder.addSourceEdge(0, hpa, 0);
builder.addTargetEdge(0, hpa, 0);
@@ -129,17 +128,18 @@
private class HashBuildActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public HashBuildActivityNode(ActivityId id) {
+ private final ActivityId hpaId;
+
+ public HashBuildActivityNode(ActivityId id, ActivityId hpaId) {
super(id);
+ this.hpaId = hpaId;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- final RecordDescriptor rd0 = recordDescProvider
- .getInputRecordDescriptor(getOperatorId(), BUILD_ACTIVITY_ID);
- final RecordDescriptor rd1 = recordDescProvider
- .getInputRecordDescriptor(getOperatorId(), PROBE_ACTIVITY_ID);
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(hpaId, 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -204,8 +204,8 @@
@Override
public void open() throws HyracksDataException {
- state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
- BUILD_ACTIVITY_ID), partition));
+ state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(), 0),
+ partition));
writer.open();
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 8d4f57a..a699703 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -58,14 +58,15 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- JoinCacheActivityNode jc = new JoinCacheActivityNode(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID));
- NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(new ActivityId(getOperatorId(),
- NL_JOIN_ACTIVITY_ID));
+ ActivityId jcaId = new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID);
+ ActivityId nljAid = new ActivityId(getOperatorId(), NL_JOIN_ACTIVITY_ID);
+ JoinCacheActivityNode jc = new JoinCacheActivityNode(jcaId, nljAid);
+ NestedLoopJoinActivityNode nlj = new NestedLoopJoinActivityNode(nljAid);
- builder.addActivity(jc);
+ builder.addActivity(this, jc);
builder.addSourceEdge(1, jc, 0);
- builder.addActivity(nlj);
+ builder.addActivity(this, nlj);
builder.addSourceEdge(0, nlj, 0);
builder.addTargetEdge(0, nlj, 0);
@@ -96,15 +97,18 @@
private class JoinCacheActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public JoinCacheActivityNode(ActivityId id) {
+ private final ActivityId nljAid;
+
+ public JoinCacheActivityNode(ActivityId id, ActivityId nljAid) {
super(id);
+ this.nljAid = nljAid;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
- final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
+ final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index da02f3d..3a7ee2c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -93,7 +93,6 @@
*/
public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
-
private static final int BUILD_AND_PARTITION_ACTIVITY_ID = 0;
private static final int PARTITION_AND_JOIN_ACTIVITY_ID = 1;
@@ -162,15 +161,15 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(new ActivityId(odId,
- BUILD_AND_PARTITION_ACTIVITY_ID));
- ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(new ActivityId(odId,
- PARTITION_AND_JOIN_ACTIVITY_ID));
+ ActivityId buildAid = new ActivityId(odId, BUILD_AND_PARTITION_ACTIVITY_ID);
+ ActivityId probeAid = new ActivityId(odId, PARTITION_AND_JOIN_ACTIVITY_ID);
+ PartitionAndBuildActivityNode phase1 = new PartitionAndBuildActivityNode(buildAid, probeAid);
+ ProbeAndJoinActivityNode phase2 = new ProbeAndJoinActivityNode(probeAid, buildAid);
- builder.addActivity(phase1);
+ builder.addActivity(this, phase1);
builder.addSourceEdge(0, phase1, 0);
- builder.addActivity(phase2);
+ builder.addActivity(this, phase2);
builder.addSourceEdge(1, phase2, 0);
builder.addBlockingEdge(phase1, phase2);
@@ -236,16 +235,19 @@
private class PartitionAndBuildActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
- public PartitionAndBuildActivityNode(ActivityId id) {
+ private final ActivityId probeAid;
+
+ public PartitionAndBuildActivityNode(ActivityId id, ActivityId probeAid) {
super(id);
+ this.probeAid = probeAid;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
+ final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; i++) {
@@ -314,16 +316,19 @@
private static final long serialVersionUID = 1L;
- public ProbeAndJoinActivityNode(ActivityId id) {
+ private final ActivityId buildAid;
+
+ public ProbeAndJoinActivityNode(ActivityId id, ActivityId buildAid) {
super(id);
+ this.buildAid = buildAid;
}
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
- final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
+ final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
+ final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
index 7de8f95..30aae7f 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/map/DeserializedMapperOperatorDescriptor.java
@@ -64,8 +64,8 @@
private final IDeserializedMapperFactory mapperFactory;
- public DeserializedMapperOperatorDescriptor(IOperatorDescriptorRegistry spec, IDeserializedMapperFactory mapperFactory,
- RecordDescriptor recordDescriptor) {
+ public DeserializedMapperOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ IDeserializedMapperFactory mapperFactory, RecordDescriptor recordDescriptor) {
super(spec, 1, 1);
this.mapperFactory = mapperFactory;
recordDescriptors[0] = recordDescriptor;
@@ -75,6 +75,6 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new MapperOperator(),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index d38b0a4..ba8c00c 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -54,10 +54,10 @@
MaterializerActivityNode ma = new MaterializerActivityNode(new ActivityId(odId, MATERIALIZER_ACTIVITY_ID));
ReaderActivityNode ra = new ReaderActivityNode(new ActivityId(odId, READER_ACTIVITY_ID));
- builder.addActivity(ma);
+ builder.addActivity(this, ma);
builder.addSourceEdge(0, ma, 0);
- builder.addActivity(ra);
+ builder.addActivity(this, ra);
builder.addTargetEdge(0, ra, 0);
builder.addBlockingEdge(ma, ra);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
index 56a4f79..47a8616 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/PrinterOperatorDescriptor.java
@@ -64,6 +64,6 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
return new DeserializedOperatorNodePushable(ctx, new PrinterOperator(),
- recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0));
+ recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
}
}
\ No newline at end of file
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
index c8c8fb8..aad0e66 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SplitVectorOperatorDescriptor.java
@@ -105,7 +105,7 @@
}
};
return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0));
+ getActivityId(), 0));
}
}
@@ -158,8 +158,8 @@
writer.fail();
}
};
- return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getInputRecordDescriptor(
- getOperatorId(), 0));
+ return new DeserializedOperatorNodePushable(ctx, op, recordDescProvider.getOutputRecordDescriptor(
+ getActivityId(), 0));
}
}
@@ -178,10 +178,10 @@
CollectActivity ca = new CollectActivity(new ActivityId(odId, COLLECT_ACTIVITY_ID));
SplitActivity sa = new SplitActivity(new ActivityId(odId, SPLIT_ACTIVITY_ID));
- builder.addActivity(ca);
+ builder.addActivity(this, ca);
builder.addSourceEdge(0, ca, 0);
- builder.addActivity(sa);
+ builder.addActivity(this, sa);
builder.addTargetEdge(0, sa, 0);
builder.addBlockingEdge(ca, sa);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index 708992b..be71b44 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -75,10 +75,10 @@
SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
- builder.addActivity(sa);
+ builder.addActivity(this, sa);
builder.addSourceEdge(0, sa, 0);
- builder.addActivity(ma);
+ builder.addActivity(this, ma);
builder.addTargetEdge(0, ma, 0);
builder.addBlockingEdge(sa, ma);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index be1e067..3b3c28d 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -67,10 +67,10 @@
SortActivity sa = new SortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
MergeActivity ma = new MergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
- builder.addActivity(sa);
+ builder.addActivity(this, sa);
builder.addSourceEdge(0, sa, 0);
- builder.addActivity(ma);
+ builder.addActivity(this, ma);
builder.addTargetEdge(0, ma, 0);
builder.addBlockingEdge(sa, ma);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
index 23169b5..d9c16d8 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/OptimizedExternalSortOperatorDescriptor.java
@@ -98,10 +98,10 @@
OptimizedSortActivity osa = new OptimizedSortActivity(new ActivityId(odId, SORT_ACTIVITY_ID));
OptimizedMergeActivity oma = new OptimizedMergeActivity(new ActivityId(odId, MERGE_ACTIVITY_ID));
- builder.addActivity(osa);
+ builder.addActivity(this, osa);
builder.addSourceEdge(0, osa, 0);
- builder.addActivity(oma);
+ builder.addActivity(this, oma);
builder.addTargetEdge(0, oma, 0);
builder.addBlockingEdge(osa, oma);
@@ -191,8 +191,8 @@
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
- getOperatorId(), SORT_ACTIVITY_ID), partition));
+ OptimizedSortTaskState state = (OptimizedSortTaskState) ctx.getStateObject(new TaskId(
+ new ActivityId(getOperatorId(), SORT_ACTIVITY_ID), partition));
List<IFrameReader> runs = state.runs;
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index 6c93b61..7ac9f58 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -40,7 +40,7 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
UnionActivityNode uba = new UnionActivityNode(new ActivityId(getOperatorId(), 0));
- builder.addActivity(uba);
+ builder.addActivity(this, uba);
for (int i = 0; i < inputArity; ++i) {
builder.addSourceEdge(i, uba, i);
}
@@ -58,7 +58,7 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
throws HyracksDataException {
- RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
+ RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new UnionOperator(ctx, inRecordDesc);
}
}
diff --git a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
index d556285..0d5a627 100644
--- a/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++ b/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -87,8 +87,7 @@
/**
* Test of aggregations using locality aware connector. The output two files should be the
- * same, each of which is the aggregation of two copies of the lineitem.tbl.
- *
+ * same, each of which is the aggregation of two copies of the lineitem.tbl.
* Note that if the hashing connector is not working correctly, the two files may be different. This
* also means that even the output files are the same, the hashing may have other problems.
*
@@ -99,66 +98,47 @@
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
+ "asterix-003", "asterix-004");
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true),
- new FloatSumFieldAggregatorFactory(5, true) }),
- outputRec, tableSize);
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
+ new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- "asterix-005", "asterix-006");
-
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
+
BitSet nodemap = new BitSet(8);
-
+
nodemap.set(0);
nodemap.set(2);
nodemap.set(5);
nodemap.set(7);
- IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }), new HashtableLocalityMap(nodemap));
-
- new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "localityAwareSumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "localityAwareSumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- "asterix-005", "asterix-006");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, "asterix-005", "asterix-006");
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -166,11 +146,11 @@
spec.addRoot(printer);
runTest(spec);
}
-
+
/**
* Test for locality aware connector, using the global hashing node mapper. This should have
* the exactly the same result as using {@link MToNPartitioningConnectorDescriptor}.
- *
+ *
* @throws Exception
*/
@Test
@@ -178,59 +158,40 @@
JobSpecification spec = new JobSpecification();
- FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(
- spec, splitProvider, tupleParserFactory, desc);
+ FileScanOperatorDescriptor csvScanner = new FileScanOperatorDescriptor(spec, splitProvider, tupleParserFactory,
+ desc);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
- csvScanner, "asterix-001", "asterix-002", "asterix-003", "asterix-004");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, csvScanner, "asterix-001", "asterix-002",
+ "asterix-003", "asterix-004");
- RecordDescriptor outputRec = new RecordDescriptor(
- new ISerializerDeserializer[] {
- UTF8StringSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- IntegerSerializerDeserializer.INSTANCE,
- FloatSerializerDeserializer.INSTANCE });
+ RecordDescriptor outputRec = new RecordDescriptor(new ISerializerDeserializer[] {
+ UTF8StringSerializerDeserializer.INSTANCE, IntegerSerializerDeserializer.INSTANCE,
+ IntegerSerializerDeserializer.INSTANCE, FloatSerializerDeserializer.INSTANCE });
int[] keyFields = new int[] { 0 };
int tableSize = 8;
- HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(
- spec,
- keyFields,
- new FieldHashPartitionComputerFactory(
- keyFields,
+ HashGroupOperatorDescriptor grouper = new HashGroupOperatorDescriptor(spec, keyFields,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }),
- new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory
- .of(UTF8StringPointable.FACTORY) },
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, true),
- new IntSumFieldAggregatorFactory(3, true),
- new FloatSumFieldAggregatorFactory(5, true) }),
- outputRec, tableSize);
+ new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+ new MultiFieldsAggregatorFactory(new IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, true), new IntSumFieldAggregatorFactory(3, true),
+ new FloatSumFieldAggregatorFactory(5, true) }), outputRec, tableSize);
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
- "asterix-005", "asterix-006");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper, "asterix-005", "asterix-006");
- IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
- keyFields,
+ IConnectorDescriptor conn1 = new LocalityAwareMToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keyFields,
new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
.of(UTF8StringPointable.FACTORY) }), new GlobalHashingLocalityMap());
-
- new MToNPartitioningConnectorDescriptor(
- spec,
- new FieldHashPartitionComputerFactory(
- keyFields,
- new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory
- .of(UTF8StringPointable.FACTORY) }));
+
spec.connect(conn1, csvScanner, 0, grouper, 0);
- AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec,
- "localityAwareSumInmemGroupTest");
+ AbstractSingleActivityOperatorDescriptor printer = getPrinter(spec, "localityAwareSumInmemGroupTest");
- PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer,
- "asterix-005", "asterix-006");
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, "asterix-005", "asterix-006");
IConnectorDescriptor conn2 = new OneToOneConnectorDescriptor(spec);
spec.connect(conn2, grouper, 0, printer, 0);
@@ -238,16 +199,14 @@
spec.addRoot(printer);
runTest(spec);
}
-
- private AbstractSingleActivityOperatorDescriptor getPrinter(
- IOperatorDescriptorRegistry spec, String prefix) throws IOException {
- AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(
- spec, new ConstantFileSplitProvider(new FileSplit[] {
- new FileSplit("asterix-005", createTempFile()
- .getAbsolutePath()),
- new FileSplit("asterix-006", createTempFile()
- .getAbsolutePath()) }), "\t");
+ private AbstractSingleActivityOperatorDescriptor getPrinter(IOperatorDescriptorRegistry spec, String prefix)
+ throws IOException {
+
+ AbstractSingleActivityOperatorDescriptor printer = new PlainFileWriterOperatorDescriptor(spec,
+ new ConstantFileSplitProvider(new FileSplit[] {
+ new FileSplit("asterix-005", createTempFile().getAbsolutePath()),
+ new FileSplit("asterix-006", createTempFile().getAbsolutePath()) }), "\t");
return printer;
}
diff --git a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 9dda175..77a7a1e 100644
--- a/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -38,7 +38,7 @@
super(opDesc, ctx, partition, recordDescProvider);
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
- this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
if (lowKeyFields != null && lowKeyFields.length > 0) {
lowKey = new PermutingFrameTupleReference();
lowKey.setFieldPermutation(lowKeyFields);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
index d9f3a5a..a2d78a4 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorNodePushable.java
@@ -49,7 +49,7 @@
public void open() throws HyracksDataException {
AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexHelper
.getOperatorDescriptor();
- RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), recDesc);
try {
treeIndexHelper.init(false);
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index fb9c965..e05568f 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -55,7 +55,7 @@
public void open() throws HyracksDataException {
AbstractTreeIndexOperatorDescriptor opDesc = (AbstractTreeIndexOperatorDescriptor) treeIndexHelper
.getOperatorDescriptor();
- RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(treeIndexHelper.getHyracksTaskContext().getFrameSize(), inputRecDesc);
writeBuffer = treeIndexHelper.getHyracksTaskContext().allocateFrame();
writer.open();
diff --git a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
index 0bd970a..a4c8e7a 100644
--- a/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexSearchOperatorNodePushable.java
@@ -54,7 +54,7 @@
int partition, IRecordDescriptorProvider recordDescProvider) {
treeIndexHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(
opDesc, ctx, partition);
- this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ this.recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
}
protected abstract ISearchPredicate createSearchPredicate();
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
index 94409de..a146479 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
@@ -46,9 +46,9 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider,
- int partition, int nPartitions) throws HyracksDataException {
- return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(odId, 0),
- recordDescriptors[0], tokenizerFactory.createTokenizer(), tokenFields, keyFields);
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(
+ getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), tokenFields, keyFields);
}
}
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
index 62daff2..c6fa56d 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexBulkLoadOperatorNodePushable.java
@@ -52,7 +52,7 @@
public void open() throws HyracksDataException {
AbstractInvertedIndexOperatorDescriptor opDesc = (AbstractInvertedIndexOperatorDescriptor) btreeDataflowHelper
.getOperatorDescriptor();
- RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(btreeDataflowHelper.getHyracksTaskContext().getFrameSize(), recDesc);
// BTree.
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
index 8536a07..b9a43ea 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/dataflow/InvertedIndexSearchOperatorNodePushable.java
@@ -71,7 +71,7 @@
@Override
public void open() throws HyracksDataException {
- RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
+ RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(btreeDataflowHelper.getHyracksTaskContext().getFrameSize(), recDesc);
tuple = new FrameTupleReference();
// BTree.