rewrite hyracks activity cluster graphs to eliminate one-to-one connectors
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_job_rewriter@2964 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index f36b7b3..91a39d7 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -17,11 +17,13 @@
import edu.uci.ics.hyracks.api.job.JobFlag;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.job.JobSpecification;
+import edu.uci.ics.hyracks.api.rewriter.ActivityClusterGraphRewriter;
public class JobSpecificationActivityClusterGraphGeneratorFactory implements IActivityClusterGraphGeneratorFactory {
private static final long serialVersionUID = 1L;
private final JobSpecification spec;
+ private final ActivityClusterGraphRewriter rewriter = new ActivityClusterGraphRewriter();
public JobSpecificationActivityClusterGraphGeneratorFactory(JobSpecification jobSpec) {
this.spec = jobSpec;
@@ -78,6 +80,7 @@
return new IActivityClusterGraphGenerator() {
@Override
public ActivityClusterGraph initialize() {
+ rewriter.rewrite(acg);
return acg;
}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
index 6698ff7..9fb2b08 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityCluster.java
@@ -33,7 +33,7 @@
import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
-public final class ActivityCluster implements Serializable {
+public class ActivityCluster implements Serializable {
private static final long serialVersionUID = 1L;
private final ActivityClusterGraph acg;
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
new file mode 100644
index 0000000..cb2acbc
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/ActivityClusterGraphRewriter.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.rewriter.runtime.SuperActivity;
+
+public class ActivityClusterGraphRewriter implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private Map<IActivity, SuperActivity> invertedActivitySuperActivityMap = new HashMap<IActivity, SuperActivity>();
+
+ public void rewrite(ActivityClusterGraph acg) {
+ acg.getActivityMap().clear();
+ acg.getConnectorMap().clear();
+ for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+ rewriteIntraActivityCluster(entry.getValue());
+ }
+ for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+ rewriteInterActivityCluster(entry.getValue());
+ }
+ invertedActivitySuperActivityMap.clear();
+ }
+
+ /**
+ * rewrite the blocking relationship among activity cluster
+ *
+ * @param ac
+ * the activity cluster to be rewritten
+ */
+ private void rewriteInterActivityCluster(ActivityCluster ac) {
+ Map<ActivityId, Set<ActivityId>> blocked2BlockerMap = ac.getBlocked2BlockerMap();
+ Map<ActivityId, ActivityId> invertedAid2SuperAidMap = new HashMap<ActivityId, ActivityId>();
+ for (Entry<IActivity, SuperActivity> entry : invertedActivitySuperActivityMap.entrySet()) {
+ invertedAid2SuperAidMap.put(entry.getKey().getActivityId(), entry.getValue().getActivityId());
+ }
+ Map<ActivityId, Set<ActivityId>> replacedBlocked2BlockerMap = new HashMap<ActivityId, Set<ActivityId>>();
+ for (Entry<ActivityId, Set<ActivityId>> entry : blocked2BlockerMap.entrySet()) {
+ ActivityId blocked = entry.getKey();
+ ActivityId replacedBlocked = invertedAid2SuperAidMap.get(blocked);
+ Set<ActivityId> blockers = entry.getValue();
+ Set<ActivityId> replacedBlockers = null;
+ if (blockers != null) {
+ replacedBlockers = new HashSet<ActivityId>();
+ for (ActivityId blocker : blockers) {
+ replacedBlockers.add(invertedAid2SuperAidMap.get(blocker));
+ ac.getDependencies().add(
+ ac.getActivityClusterGraph().getActivityMap().get(invertedAid2SuperAidMap.get(blocker)));
+ }
+ }
+ replacedBlocked2BlockerMap.put(replacedBlocked, replacedBlockers);
+ }
+ blocked2BlockerMap.clear();
+ blocked2BlockerMap.putAll(replacedBlocked2BlockerMap);
+ }
+
+ /**
+ * rewrite an activity cluster internally
+ *
+ * @param ac
+ * the activity cluster to be rewritten
+ * the activity cluster to be rewritten
+ */
+ private void rewriteIntraActivityCluster(ActivityCluster ac) {
+ Map<ActivityId, IActivity> activities = ac.getActivityMap();
+ Map<ActivityId, List<IConnectorDescriptor>> activityInputMap = ac.getActivityInputMap();
+ Map<ActivityId, List<IConnectorDescriptor>> activityOutputMap = ac.getActivityOutputMap();
+ Map<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> connectorActivityMap = ac
+ .getConnectorActivityMap();
+ ActivityClusterGraph acg = ac.getActivityClusterGraph();
+ Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+ Map<ActivityId, SuperActivity> superActivities = new HashMap<ActivityId, SuperActivity>();
+ Map<ActivityId, Queue<IActivity>> toBeExpendedMap = new HashMap<ActivityId, Queue<IActivity>>();
+
+ /**
+ * Build the initial super activities
+ */
+ for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
+ ActivityId activityId = entry.getKey();
+ IActivity activity = entry.getValue();
+ if (activityInputMap.get(activityId) == null) {
+ startActivities.put(activityId, activity);
+ /**
+ * use the start activity's id as the id of the super activity
+ */
+ createNewSuperActivity(ac, superActivities, toBeExpendedMap, invertedActivitySuperActivityMap,
+ activityId, activity);
+ }
+ }
+
+ /**
+ * expend one-to-one connected activity cluster by the BFS order
+ * after the while-loop, the original activities are partitioned
+ * into equivalent classes, one-per-super-activity
+ */
+ Map<ActivityId, SuperActivity> clonedSuperActivities = new HashMap<ActivityId, SuperActivity>();
+ while (toBeExpendedMap.size() > 0) {
+ clonedSuperActivities.clear();
+ clonedSuperActivities.putAll(superActivities);
+ for (Entry<ActivityId, SuperActivity> entry : clonedSuperActivities.entrySet()) {
+ ActivityId superActivityId = entry.getKey();
+ SuperActivity superActivity = entry.getValue();
+
+ /**
+ * for the case where the super activity has already been swallowed
+ */
+ if (superActivities.get(superActivityId) == null) {
+ continue;
+ }
+
+ /**
+ * expend the super activity
+ */
+ Queue<IActivity> toBeExpended = toBeExpendedMap.get(superActivityId);
+ if (toBeExpended == null) {
+ /**
+ * Nothing to expand
+ */
+ continue;
+ }
+ IActivity expendingActivity = toBeExpended.poll();
+ List<IConnectorDescriptor> outputConnectors = activityOutputMap.get(expendingActivity.getActivityId());
+ if (outputConnectors != null) {
+ for (IConnectorDescriptor outputConn : outputConnectors) {
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = connectorActivityMap
+ .get(outputConn.getConnectorId());
+ IActivity newActivity = endPoints.getRight().getLeft();
+ SuperActivity existingSuperActivity = invertedActivitySuperActivityMap.get(newActivity);
+ if (outputConn.getClass().getName().contains("OneToOneConnectorDescriptor")) {
+ /**
+ * expend the super activity cluster on an one-to-one out-bound connection
+ */
+ if (existingSuperActivity == null) {
+ superActivity.addActivity(newActivity);
+ toBeExpended.add(newActivity);
+ invertedActivitySuperActivityMap.put(newActivity, superActivity);
+ } else {
+ /**
+ * the two activities already in the same super activity
+ */
+ if (existingSuperActivity == superActivity) {
+ continue;
+ }
+ /**
+ * swallow an existing super activity
+ */
+ swallowExistingSuperActivity(superActivities, toBeExpendedMap,
+ invertedActivitySuperActivityMap, superActivity, superActivityId,
+ existingSuperActivity);
+ }
+ } else {
+ if (existingSuperActivity == null) {
+ /**
+ * create new activity
+ */
+ createNewSuperActivity(ac, superActivities, toBeExpendedMap,
+ invertedActivitySuperActivityMap, newActivity.getActivityId(), newActivity);
+ }
+ }
+ }
+ }
+
+ /**
+ * remove the to-be-expended queue if it is empty
+ */
+ if (toBeExpended.size() == 0) {
+ toBeExpendedMap.remove(superActivityId);
+ }
+ }
+ }
+
+ Map<ConnectorDescriptorId, IConnectorDescriptor> connMap = ac.getConnectorMap();
+ Map<ConnectorDescriptorId, RecordDescriptor> connRecordDesc = ac.getConnectorRecordDescriptorMap();
+ Map<SuperActivity, Integer> superActivityProducerPort = new HashMap<SuperActivity, Integer>();
+ Map<SuperActivity, Integer> superActivityConsumerPort = new HashMap<SuperActivity, Integer>();
+ for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+ superActivityProducerPort.put(entry.getValue(), 0);
+ superActivityConsumerPort.put(entry.getValue(), 0);
+ }
+
+ /**
+ * create a new activity cluster to replace the old activity cluster
+ */
+ ActivityCluster newActivityCluster = new ActivityCluster(acg, ac.getId());
+ newActivityCluster.setConnectorPolicyAssignmentPolicy(ac.getConnectorPolicyAssignmentPolicy());
+ for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+ newActivityCluster.addActivity(entry.getValue());
+ acg.getActivityMap().put(entry.getKey(), newActivityCluster);
+ }
+
+ /**
+ * Setup connectors: either inside a super activity or among super activities
+ */
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> entry : connectorActivityMap
+ .entrySet()) {
+ ConnectorDescriptorId connectorId = entry.getKey();
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> endPoints = entry.getValue();
+ IActivity producerActivity = endPoints.getLeft().getLeft();
+ IActivity consumerActivity = endPoints.getRight().getLeft();
+ int producerPort = endPoints.getLeft().getRight();
+ int consumerPort = endPoints.getRight().getRight();
+ RecordDescriptor recordDescriptor = connRecordDesc.get(connectorId);
+ IConnectorDescriptor conn = connMap.get(connectorId);
+ if (conn.getClass().getName().contains("OneToOneConnectorDescriptor")) {
+ /**
+ * connection edge between inner activities
+ */
+ SuperActivity residingSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
+ residingSuperActivity.connect(conn, producerActivity, producerPort, consumerActivity, consumerPort,
+ recordDescriptor);
+ } else {
+ /**
+ * connection edge between super activities
+ */
+ SuperActivity producerSuperActivity = invertedActivitySuperActivityMap.get(producerActivity);
+ SuperActivity consumerSuperActivity = invertedActivitySuperActivityMap.get(consumerActivity);
+ int producerSAPort = superActivityProducerPort.get(producerSuperActivity);
+ int consumerSAPort = superActivityConsumerPort.get(consumerSuperActivity);
+ newActivityCluster.connect(conn, producerSuperActivity, producerSAPort, consumerSuperActivity,
+ consumerSAPort, recordDescriptor);
+
+ /**
+ * bridge the port
+ */
+ producerSuperActivity.setClusterOutputIndex(producerSAPort, producerActivity.getActivityId(),
+ producerPort);
+ consumerSuperActivity.setClusterInputIndex(consumerSAPort, consumerActivity.getActivityId(),
+ consumerPort);
+ acg.getConnectorMap().put(connectorId, newActivityCluster);
+ }
+ }
+
+ /**
+ * Set up the roots of the new activity cluster
+ */
+ for (Entry<ActivityId, SuperActivity> entry : superActivities.entrySet()) {
+ List<IConnectorDescriptor> connIds = newActivityCluster.getActivityOutputMap().get(entry.getKey());
+ if (connIds == null || connIds.size() == 0) {
+ newActivityCluster.addRoot(entry.getValue());
+ }
+ }
+
+ /**
+ * set up the blocked2Blocker mapping, which will be updated in the rewriteInterActivityCluster call
+ */
+ newActivityCluster.getBlocked2BlockerMap().putAll(ac.getBlocked2BlockerMap());
+
+ /**
+ * replace the old activity cluster with the new activity cluster
+ */
+ acg.getActivityClusterMap().put(ac.getId(), newActivityCluster);
+ }
+
+ /**
+ * Create a new super activity
+ *
+ * @param acg
+ * @param superActivities
+ * @param toBeExpendedMap
+ * @param invertedActivitySuperActivityMap
+ * @param activityId
+ * @param activity
+ */
+ private void createNewSuperActivity(ActivityCluster acg, Map<ActivityId, SuperActivity> superActivities,
+ Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
+ Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, ActivityId activityId, IActivity activity) {
+ SuperActivity superActivity = new SuperActivity(acg.getActivityClusterGraph(), acg.getId(), activityId);
+ superActivities.put(activityId, superActivity);
+ superActivity.addActivity(activity);
+ Queue<IActivity> toBeExpended = new LinkedList<IActivity>();
+ toBeExpended.add(activity);
+ toBeExpendedMap.put(activityId, toBeExpended);
+ invertedActivitySuperActivityMap.put(activity, superActivity);
+ }
+
+ /**
+ * One super activity swallows another existing super activity
+ *
+ * @param superActivities
+ * @param toBeExpendedMap
+ * @param invertedActivitySuperActivityMap
+ * @param superActivity
+ * @param superActivityId
+ * @param existingSuperActivity
+ */
+ private void swallowExistingSuperActivity(Map<ActivityId, SuperActivity> superActivities,
+ Map<ActivityId, Queue<IActivity>> toBeExpendedMap,
+ Map<IActivity, SuperActivity> invertedActivitySuperActivityMap, SuperActivity superActivity,
+ ActivityId superActivityId, SuperActivity existingSuperActivity) {
+ ActivityId existingSuperActivityId = existingSuperActivity.getActivityId();
+ superActivities.remove(existingSuperActivityId);
+ for (Entry<ActivityId, IActivity> existingEntry : existingSuperActivity.getActivityMap().entrySet()) {
+ IActivity existingActivity = existingEntry.getValue();
+ superActivity.addActivity(existingActivity);
+ invertedActivitySuperActivityMap.put(existingActivity, superActivity);
+ }
+ Queue<IActivity> tbeQueue = toBeExpendedMap.get(superActivityId);
+ Queue<IActivity> existingTbeQueque = toBeExpendedMap.remove(existingSuperActivityId);
+ tbeQueue.addAll(existingTbeQueque);
+ }
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
new file mode 100644
index 0000000..2538bb9
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/OneToOneConnectedActivityCluster.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+
+/**
+ * All the connectors in an OneToOneConnectedCluster are OneToOneConnectorDescriptors.
+ *
+ * @author yingyib
+ */
+public class OneToOneConnectedActivityCluster extends ActivityCluster {
+
+ private static final long serialVersionUID = 1L;
+ protected final Map<Integer, Pair<ActivityId, Integer>> clusterInputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
+ protected final Map<Integer, Pair<ActivityId, Integer>> clusterOutputIndexMap = new HashMap<Integer, Pair<ActivityId, Integer>>();
+ protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterOutputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
+ protected final Map<Pair<ActivityId, Integer>, Integer> invertedClusterInputIndexMap = new HashMap<Pair<ActivityId, Integer>, Integer>();
+
+ public OneToOneConnectedActivityCluster(ActivityClusterGraph acg, ActivityClusterId id) {
+ super(acg, id);
+ }
+
+ public void setClusterOutputIndex(int clusterOutputIndex, ActivityId activityId, int activityOutputIndex) {
+ clusterOutputIndexMap.put(clusterOutputIndex, Pair.of(activityId, activityOutputIndex));
+ invertedClusterOutputIndexMap.put(Pair.of(activityId, activityOutputIndex), clusterOutputIndex);
+ }
+
+ public Pair<ActivityId, Integer> getActivityIdOutputIndex(int clusterOutputIndex) {
+ return clusterOutputIndexMap.get(clusterOutputIndex);
+ }
+
+ public void setClusterInputIndex(int clusterInputIndex, ActivityId activityId, int activityInputIndex) {
+ clusterInputIndexMap.put(clusterInputIndex, Pair.of(activityId, activityInputIndex));
+ invertedClusterInputIndexMap.put(Pair.of(activityId, activityInputIndex), clusterInputIndex);
+ }
+
+ public Pair<ActivityId, Integer> getActivityIdInputIndex(int clusterInputIndex) {
+ return clusterInputIndexMap.get(clusterInputIndex);
+ }
+
+ public int getClusterInputIndex(Pair<ActivityId, Integer> activityInputChannel) {
+ return invertedClusterInputIndexMap.get(activityInputChannel);
+ }
+
+ public int getClusterOutputIndex(Pair<ActivityId, Integer> activityOutputChannel) {
+ return invertedClusterOutputIndexMap.get(activityOutputChannel);
+ }
+
+}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
new file mode 100644
index 0000000..3fe7b61
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivity.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter.runtime;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.ActivityCluster;
+import edu.uci.ics.hyracks.api.job.ActivityClusterGraph;
+import edu.uci.ics.hyracks.api.job.ActivityClusterId;
+import edu.uci.ics.hyracks.api.rewriter.OneToOneConnectedActivityCluster;
+
+/**
+ * This class can be used to execute a DAG of activities inside which
+ * there are only one-to-one connectors.
+ *
+ * @author yingyib
+ */
+public class SuperActivity extends OneToOneConnectedActivityCluster implements IActivity {
+ private static final long serialVersionUID = 1L;
+ private final ActivityId activityId;
+
+ public SuperActivity(ActivityClusterGraph acg, ActivityClusterId id, ActivityId activityId) {
+ super(acg, id);
+ this.activityId = activityId;
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ final IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
+ throws HyracksDataException {
+ final Map<ActivityId, IActivity> startActivities = new HashMap<ActivityId, IActivity>();
+ Map<ActivityId, IActivity> activities = getActivityMap();
+ for (Entry<ActivityId, IActivity> entry : activities.entrySet()) {
+ /**
+ * extract start activities
+ */
+ List<IConnectorDescriptor> conns = getActivityInputMap().get(entry.getKey());
+ if (conns == null || conns.size() == 0) {
+ startActivities.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ IRecordDescriptorProvider wrappedRecDescProvider = new IRecordDescriptorProvider() {
+
+ @Override
+ public RecordDescriptor getInputRecordDescriptor(ActivityId aid, int inputIndex) {
+ if (startActivities.get(aid) != null) {
+ int superActivityInputChannel = SuperActivity.this.getClusterInputIndex(Pair.of(aid, inputIndex));
+ return recordDescProvider.getInputRecordDescriptor(activityId, superActivityInputChannel);
+ } else if (SuperActivity.this.getActivityMap().get(aid) != null) {
+ IConnectorDescriptor conn = getActivityInputMap().get(aid).get(inputIndex);
+ return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ } else {
+ ActivityClusterGraph acg = SuperActivity.this.getActivityClusterGraph();
+ for (Entry<ActivityClusterId, ActivityCluster> entry : acg.getActivityClusterMap().entrySet()) {
+ ActivityCluster ac = entry.getValue();
+ for (Entry<ActivityId, IActivity> saEntry : ac.getActivityMap().entrySet()) {
+ SuperActivity sa = (SuperActivity) saEntry.getValue();
+ if (sa.getActivityMap().get(aid) != null) {
+ List<IConnectorDescriptor> conns = sa.getActivityInputMap().get(aid);
+ if (conns != null && conns.size() >= inputIndex) {
+ IConnectorDescriptor conn = conns.get(inputIndex);
+ return sa.getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ } else {
+ int superActivityInputChannel = sa.getClusterInputIndex(Pair.of(aid, inputIndex));
+ return recordDescProvider.getInputRecordDescriptor(sa.getActivityId(),
+ superActivityInputChannel);
+ }
+ }
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public RecordDescriptor getOutputRecordDescriptor(ActivityId aid, int outputIndex) {
+ RecordDescriptor providedDesc = recordDescProvider.getOutputRecordDescriptor(aid, outputIndex);
+ if (providedDesc != null) {
+ return providedDesc;
+ } else {
+ IConnectorDescriptor conn = getActivityOutputMap().get(aid).get(outputIndex);
+ return getConnectorRecordDescriptorMap().get(conn.getConnectorId());
+ }
+ }
+
+ };
+ return new SuperActivityOperatorNodePushable(this, startActivities, ctx, wrappedRecDescProvider, partition,
+ nPartitions);
+ }
+
+ @Override
+ public ActivityId getActivityId() {
+ return activityId;
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
new file mode 100644
index 0000000..d9a1551
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.api.rewriter.runtime;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.ActivityId;
+import edu.uci.ics.hyracks.api.dataflow.IActivity;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+
+public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
+ private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
+ private final List<IOperatorNodePushable> operatprNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
+ private final Map<ActivityId, IActivity> startActivities;
+ private final SuperActivity parent;
+ private final IHyracksTaskContext ctx;
+ private final IRecordDescriptorProvider recordDescProvider;
+ private final int partition;
+ private final int nPartitions;
+ private int inputArity = 0;
+ private boolean initialized = false;
+
+ public SuperActivityOperatorNodePushable(SuperActivity parent, Map<ActivityId, IActivity> startActivities,
+ IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ this.parent = parent;
+ this.startActivities = startActivities;
+ this.ctx = ctx;
+ this.recordDescProvider = recordDescProvider;
+ this.partition = partition;
+ this.nPartitions = nPartitions;
+ }
+
+ @Override
+ public synchronized void initialize() throws HyracksDataException {
+ init();
+ /**
+ * initialize operator node pushables in the BFS order
+ */
+ for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
+ op.initialize();
+ }
+
+ }
+
+ public void init() throws HyracksDataException {
+ if (initialized) {
+ return;
+ }
+
+ Map<ActivityId, IOperatorNodePushable> startOperatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
+ Queue<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>> childQueue = new LinkedList<Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>>>();
+ List<IConnectorDescriptor> outputConnectors = null;
+
+ /**
+ * Set up the source operators
+ */
+ for (Entry<ActivityId, IActivity> entry : startActivities.entrySet()) {
+ IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
+ nPartitions);
+ startOperatorNodePushables.put(entry.getKey(), opPushable);
+ operatprNodePushablesBFSOrder.add(opPushable);
+ operatorNodePushables.put(entry.getKey(), opPushable);
+ inputArity += opPushable.getInputArity();
+ outputConnectors = parent.getActivityOutputMap().get(entry.getKey());
+ if (outputConnectors != null) {
+ for (IConnectorDescriptor conn : outputConnectors) {
+ childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+ }
+ }
+ }
+
+ /**
+ * Using BFS (breadth-first search) to construct to runtime execution DAG;
+ */
+ while (childQueue.size() > 0) {
+ /**
+ * expend the executing activities further to the downstream
+ */
+ if (outputConnectors != null && outputConnectors.size() > 0) {
+ for (IConnectorDescriptor conn : outputConnectors) {
+ childQueue.add(parent.getConnectorActivityMap().get(conn.getConnectorId()));
+ }
+ }
+
+ /**
+ * construct the source to destination information
+ */
+ Pair<Pair<IActivity, Integer>, Pair<IActivity, Integer>> channel = childQueue.poll();
+ ActivityId sourceId = channel.getLeft().getLeft().getActivityId();
+ int outputChannel = channel.getLeft().getRight();
+ ActivityId destId = channel.getRight().getLeft().getActivityId();
+ int inputChannel = channel.getRight().getRight();
+ IOperatorNodePushable sourceOp = operatorNodePushables.get(sourceId);
+ IOperatorNodePushable destOp = operatorNodePushables.get(destId);
+ if (destOp == null) {
+ destOp = channel.getRight().getLeft()
+ .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
+ operatprNodePushablesBFSOrder.add(destOp);
+ operatorNodePushables.put(destId, destOp);
+ }
+
+ /**
+ * construct the dataflow connection from a producer to a consumer
+ */
+ sourceOp.setOutputFrameWriter(outputChannel, destOp.getInputFrameWriter(inputChannel),
+ recordDescProvider.getInputRecordDescriptor(destId, inputChannel));
+
+ /**
+ * traverse to the child of the current activity
+ */
+ outputConnectors = parent.getActivityOutputMap().get(destId);
+ }
+
+ initialized = true;
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ /**
+ * de-initialize operator node pushables
+ */
+ for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
+ op.deinitialize();
+ }
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public synchronized void setOutputFrameWriter(int clusterOutputIndex, IFrameWriter writer,
+ RecordDescriptor recordDesc) {
+ try {
+ init();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ /**
+ * set the right output frame writer
+ */
+ Pair<ActivityId, Integer> activityIdOutputIndex = parent.getActivityIdOutputIndex(clusterOutputIndex);
+ IOperatorNodePushable opPushable = operatorNodePushables.get(activityIdOutputIndex.getLeft());
+ opPushable.setOutputFrameWriter(activityIdOutputIndex.getRight(), writer, recordDesc);
+ }
+
+ @Override
+ public synchronized IFrameWriter getInputFrameWriter(final int index) {
+ try {
+ init();
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+
+ /**
+ * get the right IFrameWriter from the cluster input index
+ */
+ Pair<ActivityId, Integer> activityIdInputIndex = parent.getActivityIdInputIndex(index);
+ IOperatorNodePushable operatorNodePushable = operatorNodePushables.get(activityIdInputIndex.getLeft());
+ final IFrameWriter writer = operatorNodePushable.getInputFrameWriter(activityIdInputIndex.getRight());
+
+ return new IFrameWriter() {
+
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ writer.fail();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ writer.close();
+ }
+
+ };
+ }
+
+ @Override
+ public String getDisplayName() {
+ return "Meta Activity";
+ }
+
+}