ASTERIXDB-1206: call initialize()/deinitialize() in parallel for OperatorNodePushables in SuperActivityOperatorNodePushable.

Change-Id: I8700d5258d658ebbf711b4233bb0def1e8cf7c39
Reviewed-on: https://asterix-gerrit.ics.uci.edu/526
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Jianfeng Jia <jianfeng.jia@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 95c36ea..6023323 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.api.context;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
 import org.apache.hyracks.api.deployment.DeploymentId;
@@ -26,14 +28,16 @@
 import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
 import org.apache.hyracks.api.resources.IDeallocatableRegistry;
 
-public interface IHyracksTaskContext extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry,
-        IOperatorEnvironment {
+public interface IHyracksTaskContext
+        extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry, IOperatorEnvironment {
     public IHyracksJobletContext getJobletContext();
 
     public TaskAttemptId getTaskAttemptId();
 
     public ICounterContext getCounterContext();
 
+    public ExecutorService getExecutorService();
+
     public IDatasetPartitionManager getDatasetPartitionManager();
 
     public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId, String nodeId) throws Exception;
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 8213fd9..4e842bb 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -26,9 +26,10 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.tuple.Pair;
-
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
@@ -42,12 +43,12 @@
 /**
  * The runtime of a SuperActivity, which internally executes a DAG of one-to-one
  * connected activities in a single thread.
- * 
+ *
  * @author yingyib
  */
 public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
     private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
-    private final List<IOperatorNodePushable> operatprNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
+    private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
     private final Map<ActivityId, IActivity> startActivities;
     private final SuperActivity parent;
     private final IHyracksTaskContext ctx;
@@ -78,12 +79,8 @@
 
     @Override
     public void initialize() throws HyracksDataException {
-        /**
-         * initialize operator node pushables in the BFS order
-         */
-        for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
-            op.initialize();
-        }
+        // Initializes all OperatorNodePushables in parallel.
+        runInParallel(op -> op.initialize());
     }
 
     public void init() throws HyracksDataException {
@@ -98,7 +95,7 @@
             IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
                     nPartitions);
             startOperatorNodePushables.put(entry.getKey(), opPushable);
-            operatprNodePushablesBFSOrder.add(opPushable);
+            operatorNodePushablesBFSOrder.add(opPushable);
             operatorNodePushables.put(entry.getKey(), opPushable);
             inputArity += opPushable.getInputArity();
             outputConnectors = parent.getActivityOutputMap().get(entry.getKey());
@@ -136,9 +133,9 @@
             IOperatorNodePushable sourceOp = operatorNodePushables.get(sourceId);
             IOperatorNodePushable destOp = operatorNodePushables.get(destId);
             if (destOp == null) {
-                destOp = channel.getRight().getLeft()
-                        .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
-                operatprNodePushablesBFSOrder.add(destOp);
+                destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition,
+                        nPartitions);
+                operatorNodePushablesBFSOrder.add(destOp);
                 operatorNodePushables.put(destId, destOp);
             }
 
@@ -157,12 +154,8 @@
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        /**
-         * de-initialize operator node pushables
-         */
-        for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
-            op.deinitialize();
-        }
+        // De-initialize all OperatorNodePushables in parallel.
+        runInParallel(op -> op.deinitialize());
     }
 
     @Override
@@ -197,4 +190,30 @@
         return "Super Activity " + parent.getActivityMap().values().toString();
     }
 
+    interface OperatorNodePushableAction {
+        public void runAction(IOperatorNodePushable op) throws HyracksDataException;
+    }
+
+    private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException {
+        List<Future<Void>> initializationTasks = new ArrayList<Future<Void>>();
+        // Run one action for all OperatorNodePushables in parallel through a thread pool.
+        for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
+            initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    opAction.runAction(op);
+                    return null;
+                }
+            }));
+        }
+
+        // Waits until all parallel actions to finish.
+        for (Future<Void> initializationTask : initializationTasks) {
+            try {
+                initializationTask.get();
+            } catch (Exception e) {
+                throw new HyracksDataException(e);
+            }
+        }
+    }
 }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 0034bc9..fe2794c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -218,7 +218,7 @@
             LOGGER.warning("Freeing leaked " + stillAllocated + " bytes");
             appCtx.getMemoryManager().deallocate(stillAllocated);
         }
-        nodeController.getExecutor().execute(new Runnable() {
+        nodeController.getExecutorService().execute(new Runnable() {
             @Override
             public void run() {
                 deallocatableRegistry.close();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b96abf8..1a50211 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -35,7 +35,6 @@
 import java.util.StringTokenizer;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -178,8 +177,8 @@
         queue = new WorkQueue(Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
         jobletMap = new Hashtable<JobId, Joblet>();
         timer = new Timer(true);
-        serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
-                NodeControllerService.class.getName()), id));
+        serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
+                new File(new File(NodeControllerService.class.getName()), id));
         memoryMXBean = ManagementFactory.getMemoryMXBean();
         gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
         threadMXBean = ManagementFactory.getThreadMXBean();
@@ -276,8 +275,8 @@
         }
         ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
                 osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
-                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
-                        .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+                runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
                 runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
 
         synchronized (this) {
@@ -322,8 +321,8 @@
         if (className != null) {
             Class<?> c = Class.forName(className);
             ncAppEntryPoint = (INCApplicationEntryPoint) c.newInstance();
-            String[] args = ncConfig.appArgs == null ? new String[0] : ncConfig.appArgs
-                    .toArray(new String[ncConfig.appArgs.size()]);
+            String[] args = ncConfig.appArgs == null ? new String[0]
+                    : ncConfig.appArgs.toArray(new String[ncConfig.appArgs.size()]);
             ncAppEntryPoint.start(appCtx, args);
         }
         executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
@@ -334,7 +333,7 @@
         if (!shuttedDown) {
             LOGGER.log(Level.INFO, "Stopping NodeControllerService");
             executor.shutdownNow();
-            if(!executor.awaitTermination(10, TimeUnit.SECONDS)){
+            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                 LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing shutdown abnormally");
             }
             partitionManager.close();
@@ -382,7 +381,7 @@
         return nodeParameters;
     }
 
-    public Executor getExecutor() {
+    public ExecutorService getExecutorService() {
         return executor;
     }
 
@@ -489,15 +488,14 @@
             switch (fn.getFunctionId()) {
                 case SEND_APPLICATION_MESSAGE: {
                     CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
-                    queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
-                            .getDeploymentId(), amf.getNodeId()));
+                    queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
+                            amf.getDeploymentId(), amf.getNodeId()));
                     return;
                 }
                 case START_TASKS: {
                     CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
-                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
-                            stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(),
-                            stf.getFlags()));
+                    queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(),
+                            stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
                     return;
                 }
 
@@ -515,8 +513,8 @@
 
                 case REPORT_PARTITION_AVAILABILITY: {
                     CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
-                    queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
-                            .getPartitionId(), rpaf.getNetworkAddress()));
+                    queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
+                            rpaf.getPartitionId(), rpaf.getNetworkAddress()));
                     return;
                 }
 
@@ -534,8 +532,8 @@
 
                 case DEPLOY_BINARY: {
                     CCNCFunctions.DeployBinaryFunction ndbf = (CCNCFunctions.DeployBinaryFunction) fn;
-                    queue.schedule(new DeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId(), ndbf
-                            .getBinaryURLs()));
+                    queue.schedule(new DeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId(),
+                            ndbf.getBinaryURLs()));
                     return;
                 }
 
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 7178ce6..9a3582a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -26,14 +26,14 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.comm.IFrameReader;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.IPartitionCollector;
 import org.apache.hyracks.api.comm.PartitionChannel;
+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -69,7 +69,7 @@
 
     private final String displayName;
 
-    private final Executor executor;
+    private final ExecutorService executorService;
 
     private final IWorkspaceFileFactory fileFactory;
 
@@ -89,20 +89,18 @@
 
     private final List<Exception> exceptions;
 
-    private List<Throwable> caughtExceptions;
-
     private volatile boolean aborted;
 
     private NodeControllerService ncs;
 
     private List<List<PartitionChannel>> inputChannelsFromConnectors;
 
-    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs,
-            List<List<PartitionChannel>> inputChannelsFromConnectors) {
+    public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
+            NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
         this.joblet = joblet;
         this.taskAttemptId = taskId;
         this.displayName = displayName;
-        this.executor = executor;
+        this.executorService = executor;
         fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
         deallocatableRegistry = new DefaultDeallocatableRegistry();
         counterMap = new HashMap<String, Counter>();
@@ -151,6 +149,11 @@
     }
 
     @Override
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    @Override
     public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
         return fileFactory.createUnmanagedWorkspaceFile(prefix);
     }
@@ -215,7 +218,7 @@
 
     public void start() throws HyracksException {
         aborted = false;
-        executor.execute(this);
+        executorService.execute(this);
     }
 
     public synchronized void abort() {
@@ -261,7 +264,7 @@
                         final IFrameWriter writer = operator.getInputFrameWriter(i);
                         sem.acquire();
                         final int cIdx = i;
-                        executor.execute(new Runnable() {
+                        executorService.execute(new Runnable() {
                             @Override
                             public void run() {
                                 if (aborted) {
@@ -334,7 +337,7 @@
                     writer.open();
                     try {
                         VSizeFrame frame = new VSizeFrame(this);
-                        while( reader.nextFrame(frame)){
+                        while (reader.nextFrame(frame)) {
                             if (aborted) {
                                 return;
                             }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 0eefe31..3e8a90c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -53,7 +53,7 @@
         }
         final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
         ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
-        ncs.getExecutor().execute(new Runnable() {
+        ncs.getExecutorService().execute(new Runnable() {
             @Override
             public void run() {
                 for (IPartition p : unregisteredPartitions) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 745c2ab..b3468b0 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -129,7 +129,7 @@
                 }
                 final int partition = tid.getPartition();
                 List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
-                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
+                Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs,
                         createInputChannels(td, inputs));
                 IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
 
@@ -201,7 +201,7 @@
                 td.getInputPartitionCounts()[i], td.getPartitionCount());
         if (cPolicy.materializeOnReceiveSide()) {
             return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
-                    task.getTaskAttemptId(), ncs.getExecutor());
+                    task.getTaskAttemptId(), ncs.getExecutorService());
         } else {
             return collector;
         }
@@ -217,7 +217,7 @@
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
                         return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
-                                conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+                                conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutorService());
                     }
                 };
             } else {
@@ -225,7 +225,7 @@
                     @Override
                     public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
                         return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
-                                jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+                                jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutorService());
                     }
                 };
             }
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
new file mode 100644
index 0000000..ed7c08d
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.rewriting;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
+import org.junit.Test;
+
+public class SuperActivityRewritingTest extends AbstractIntegrationTest {
+
+    @Test
+    public void testScanUnion() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        DummySourceOperatorDescriptor ets1 = new DummySourceOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ets1, NC1_ID);
+
+        DummySourceOperatorDescriptor ets2 = new DummySourceOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ets2, NC1_ID);
+
+        DummySourceOperatorDescriptor ets3 = new DummySourceOperatorDescriptor(spec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ets3, NC1_ID);
+
+        ThreadCountingOperatorDescriptor tc = new ThreadCountingOperatorDescriptor(spec, 3);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, tc, NC1_ID);
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), ets1, 0, tc, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), ets2, 0, tc, 1);
+        spec.connect(new OneToOneConnectorDescriptor(spec), ets3, 0, tc, 2);
+        spec.addRoot(tc);
+        runTest(spec);
+    }
+
+}
+
+class DummySourceOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public DummySourceOperatorDescriptor(JobSpecification spec) {
+        super(spec, 0, 1);
+    }
+
+    @Override
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+            IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+        return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+            @Override
+            public void initialize() throws HyracksDataException {
+                try {
+                    writer.open();
+                    writer.close();
+                } catch (Exception e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        };
+    }
+
+}
+
+class ThreadCountingOperatorDescriptor extends AbstractOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public ThreadCountingOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity) {
+        super(spec, inputArity, 0);
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        ThreadCountingActivityNode tca = new ThreadCountingActivityNode(new ActivityId(getOperatorId(), 0));
+        builder.addActivity(this, tca);
+        for (int i = 0; i < inputArity; ++i) {
+            builder.addSourceEdge(i, tca, i);
+        }
+    }
+
+    private class ThreadCountingActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public ThreadCountingActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+                        throws HyracksDataException {
+            return new IOperatorNodePushable() {
+                private Set<Long> threads = new HashSet<Long>();
+
+                @Override
+                public void initialize() throws HyracksDataException {
+
+                }
+
+                @Override
+                public void deinitialize() throws HyracksDataException {
+                    if (threads.size() != inputArity) {
+                        throw new HyracksDataException("The number of worker threads is not as expected");
+                    }
+                }
+
+                @Override
+                public int getInputArity() {
+                    return inputArity;
+                }
+
+                @Override
+                public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+                        throws HyracksDataException {
+                    throw new IllegalStateException();
+                }
+
+                @Override
+                public IFrameWriter getInputFrameWriter(int index) {
+                    return new IFrameWriter() {
+                        @Override
+                        public void open() throws HyracksDataException {
+                            synchronized (threads) {
+                                threads.add(Thread.currentThread().getId());
+                            }
+                        }
+
+                        @Override
+                        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
+                        }
+
+                        @Override
+                        public void fail() throws HyracksDataException {
+
+                        }
+
+                        @Override
+                        public void close() throws HyracksDataException {
+
+                        }
+                    };
+                }
+
+                @Override
+                public String getDisplayName() {
+                    return "Thread-Counting-Activity";
+                }
+
+            };
+        }
+    }
+
+}
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index fb036d8..d8cf599 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -19,6 +19,7 @@
 package org.apache.hyracks.test.support;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -59,7 +60,7 @@
     @Override
     public ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newSizeInBytes, boolean copyOldData)
             throws HyracksDataException {
-        return jobletContext.reallocateFrame(tobeDeallocate,newSizeInBytes, copyOldData);
+        return jobletContext.reallocateFrame(tobeDeallocate, newSizeInBytes, copyOldData);
 
     }
 
@@ -133,4 +134,9 @@
         // TODO Auto-generated method stub
 
     }
+
+    @Override
+    public ExecutorService getExecutorService() {
+        return null;
+    }
 }