ASTERIXDB-1206: call initialize()/deinitialize() in parallel for OperatorNodePushables in SuperActivityOperatorNodePushable.
Change-Id: I8700d5258d658ebbf711b4233bb0def1e8cf7c39
Reviewed-on: https://asterix-gerrit.ics.uci.edu/526
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Jianfeng Jia <jianfeng.jia@gmail.com>
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 95c36ea..6023323 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.api.context;
+import java.util.concurrent.ExecutorService;
+
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataset.IDatasetPartitionManager;
import org.apache.hyracks.api.deployment.DeploymentId;
@@ -26,14 +28,16 @@
import org.apache.hyracks.api.job.profiling.counters.ICounterContext;
import org.apache.hyracks.api.resources.IDeallocatableRegistry;
-public interface IHyracksTaskContext extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry,
- IOperatorEnvironment {
+public interface IHyracksTaskContext
+ extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry, IOperatorEnvironment {
public IHyracksJobletContext getJobletContext();
public TaskAttemptId getTaskAttemptId();
public ICounterContext getCounterContext();
+ public ExecutorService getExecutorService();
+
public IDatasetPartitionManager getDatasetPartitionManager();
public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId, String nodeId) throws Exception;
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
index 8213fd9..4e842bb 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/rewriter/runtime/SuperActivityOperatorNodePushable.java
@@ -26,9 +26,10 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
import org.apache.commons.lang3.tuple.Pair;
-
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
@@ -42,12 +43,12 @@
/**
* The runtime of a SuperActivity, which internally executes a DAG of one-to-one
* connected activities in a single thread.
- *
+ *
* @author yingyib
*/
public class SuperActivityOperatorNodePushable implements IOperatorNodePushable {
private final Map<ActivityId, IOperatorNodePushable> operatorNodePushables = new HashMap<ActivityId, IOperatorNodePushable>();
- private final List<IOperatorNodePushable> operatprNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
+ private final List<IOperatorNodePushable> operatorNodePushablesBFSOrder = new ArrayList<IOperatorNodePushable>();
private final Map<ActivityId, IActivity> startActivities;
private final SuperActivity parent;
private final IHyracksTaskContext ctx;
@@ -78,12 +79,8 @@
@Override
public void initialize() throws HyracksDataException {
- /**
- * initialize operator node pushables in the BFS order
- */
- for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
- op.initialize();
- }
+ // Initializes all OperatorNodePushables in parallel.
+ runInParallel(op -> op.initialize());
}
public void init() throws HyracksDataException {
@@ -98,7 +95,7 @@
IOperatorNodePushable opPushable = entry.getValue().createPushRuntime(ctx, recordDescProvider, partition,
nPartitions);
startOperatorNodePushables.put(entry.getKey(), opPushable);
- operatprNodePushablesBFSOrder.add(opPushable);
+ operatorNodePushablesBFSOrder.add(opPushable);
operatorNodePushables.put(entry.getKey(), opPushable);
inputArity += opPushable.getInputArity();
outputConnectors = parent.getActivityOutputMap().get(entry.getKey());
@@ -136,9 +133,9 @@
IOperatorNodePushable sourceOp = operatorNodePushables.get(sourceId);
IOperatorNodePushable destOp = operatorNodePushables.get(destId);
if (destOp == null) {
- destOp = channel.getRight().getLeft()
- .createPushRuntime(ctx, recordDescProvider, partition, nPartitions);
- operatprNodePushablesBFSOrder.add(destOp);
+ destOp = channel.getRight().getLeft().createPushRuntime(ctx, recordDescProvider, partition,
+ nPartitions);
+ operatorNodePushablesBFSOrder.add(destOp);
operatorNodePushables.put(destId, destOp);
}
@@ -157,12 +154,8 @@
@Override
public void deinitialize() throws HyracksDataException {
- /**
- * de-initialize operator node pushables
- */
- for (IOperatorNodePushable op : operatprNodePushablesBFSOrder) {
- op.deinitialize();
- }
+ // De-initialize all OperatorNodePushables in parallel.
+ runInParallel(op -> op.deinitialize());
}
@Override
@@ -197,4 +190,30 @@
return "Super Activity " + parent.getActivityMap().values().toString();
}
+ interface OperatorNodePushableAction {
+ public void runAction(IOperatorNodePushable op) throws HyracksDataException;
+ }
+
+ private void runInParallel(OperatorNodePushableAction opAction) throws HyracksDataException {
+ List<Future<Void>> initializationTasks = new ArrayList<Future<Void>>();
+ // Run one action for all OperatorNodePushables in parallel through a thread pool.
+ for (final IOperatorNodePushable op : operatorNodePushablesBFSOrder) {
+ initializationTasks.add(ctx.getExecutorService().submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ opAction.runAction(op);
+ return null;
+ }
+ }));
+ }
+
+ // Waits until all parallel actions to finish.
+ for (Future<Void> initializationTask : initializationTasks) {
+ try {
+ initializationTask.get();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
index 0034bc9..fe2794c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Joblet.java
@@ -218,7 +218,7 @@
LOGGER.warning("Freeing leaked " + stillAllocated + " bytes");
appCtx.getMemoryManager().deallocate(stillAllocated);
}
- nodeController.getExecutor().execute(new Runnable() {
+ nodeController.getExecutorService().execute(new Runnable() {
@Override
public void run() {
deallocatableRegistry.close();
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index b96abf8..1a50211 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -35,7 +35,6 @@
import java.util.StringTokenizer;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -178,8 +177,8 @@
queue = new WorkQueue(Thread.NORM_PRIORITY); // Reserves MAX_PRIORITY of the heartbeat thread.
jobletMap = new Hashtable<JobId, Joblet>();
timer = new Timer(true);
- serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER, new File(new File(
- NodeControllerService.class.getName()), id));
+ serverCtx = new ServerContext(ServerContext.ServerType.NODE_CONTROLLER,
+ new File(new File(NodeControllerService.class.getName()), id));
memoryMXBean = ManagementFactory.getMemoryMXBean();
gcMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
threadMXBean = ManagementFactory.getThreadMXBean();
@@ -276,8 +275,8 @@
}
ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
- runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
- .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+ runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
synchronized (this) {
@@ -322,8 +321,8 @@
if (className != null) {
Class<?> c = Class.forName(className);
ncAppEntryPoint = (INCApplicationEntryPoint) c.newInstance();
- String[] args = ncConfig.appArgs == null ? new String[0] : ncConfig.appArgs
- .toArray(new String[ncConfig.appArgs.size()]);
+ String[] args = ncConfig.appArgs == null ? new String[0]
+ : ncConfig.appArgs.toArray(new String[ncConfig.appArgs.size()]);
ncAppEntryPoint.start(appCtx, args);
}
executor = Executors.newCachedThreadPool(appCtx.getThreadFactory());
@@ -334,7 +333,7 @@
if (!shuttedDown) {
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
executor.shutdownNow();
- if(!executor.awaitTermination(10, TimeUnit.SECONDS)){
+ if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
LOGGER.log(Level.SEVERE, "Some jobs failed to exit, continuing shutdown abnormally");
}
partitionManager.close();
@@ -382,7 +381,7 @@
return nodeParameters;
}
- public Executor getExecutor() {
+ public ExecutorService getExecutorService() {
return executor;
}
@@ -489,15 +488,14 @@
switch (fn.getFunctionId()) {
case SEND_APPLICATION_MESSAGE: {
CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
- queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(), amf
- .getDeploymentId(), amf.getNodeId()));
+ queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
+ amf.getDeploymentId(), amf.getNodeId()));
return;
}
case START_TASKS: {
CCNCFunctions.StartTasksFunction stf = (CCNCFunctions.StartTasksFunction) fn;
- queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(),
- stf.getJobId(), stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(),
- stf.getFlags()));
+ queue.schedule(new StartTasksWork(NodeControllerService.this, stf.getDeploymentId(), stf.getJobId(),
+ stf.getPlanBytes(), stf.getTaskDescriptors(), stf.getConnectorPolicies(), stf.getFlags()));
return;
}
@@ -515,8 +513,8 @@
case REPORT_PARTITION_AVAILABILITY: {
CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
- queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this, rpaf
- .getPartitionId(), rpaf.getNetworkAddress()));
+ queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
+ rpaf.getPartitionId(), rpaf.getNetworkAddress()));
return;
}
@@ -534,8 +532,8 @@
case DEPLOY_BINARY: {
CCNCFunctions.DeployBinaryFunction ndbf = (CCNCFunctions.DeployBinaryFunction) fn;
- queue.schedule(new DeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId(), ndbf
- .getBinaryURLs()));
+ queue.schedule(new DeployBinaryWork(NodeControllerService.this, ndbf.getDeploymentId(),
+ ndbf.getBinaryURLs()));
return;
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 7178ce6..9a3582a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -26,14 +26,14 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
-import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.comm.IFrameReader;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.PartitionChannel;
+import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -69,7 +69,7 @@
private final String displayName;
- private final Executor executor;
+ private final ExecutorService executorService;
private final IWorkspaceFileFactory fileFactory;
@@ -89,20 +89,18 @@
private final List<Exception> exceptions;
- private List<Throwable> caughtExceptions;
-
private volatile boolean aborted;
private NodeControllerService ncs;
private List<List<PartitionChannel>> inputChannelsFromConnectors;
- public Task(Joblet joblet, TaskAttemptId taskId, String displayName, Executor executor, NodeControllerService ncs,
- List<List<PartitionChannel>> inputChannelsFromConnectors) {
+ public Task(Joblet joblet, TaskAttemptId taskId, String displayName, ExecutorService executor,
+ NodeControllerService ncs, List<List<PartitionChannel>> inputChannelsFromConnectors) {
this.joblet = joblet;
this.taskAttemptId = taskId;
this.displayName = displayName;
- this.executor = executor;
+ this.executorService = executor;
fileFactory = new WorkspaceFileFactory(this, (IOManager) joblet.getIOManager());
deallocatableRegistry = new DefaultDeallocatableRegistry();
counterMap = new HashMap<String, Counter>();
@@ -151,6 +149,11 @@
}
@Override
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ @Override
public FileReference createUnmanagedWorkspaceFile(String prefix) throws HyracksDataException {
return fileFactory.createUnmanagedWorkspaceFile(prefix);
}
@@ -215,7 +218,7 @@
public void start() throws HyracksException {
aborted = false;
- executor.execute(this);
+ executorService.execute(this);
}
public synchronized void abort() {
@@ -261,7 +264,7 @@
final IFrameWriter writer = operator.getInputFrameWriter(i);
sem.acquire();
final int cIdx = i;
- executor.execute(new Runnable() {
+ executorService.execute(new Runnable() {
@Override
public void run() {
if (aborted) {
@@ -334,7 +337,7 @@
writer.open();
try {
VSizeFrame frame = new VSizeFrame(this);
- while( reader.nextFrame(frame)){
+ while (reader.nextFrame(frame)) {
if (aborted) {
return;
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 0eefe31..3e8a90c 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -53,7 +53,7 @@
}
final List<IPartition> unregisteredPartitions = new ArrayList<IPartition>();
ncs.getPartitionManager().unregisterPartitions(jobId, unregisteredPartitions);
- ncs.getExecutor().execute(new Runnable() {
+ ncs.getExecutorService().execute(new Runnable() {
@Override
public void run() {
for (IPartition p : unregisteredPartitions) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index 745c2ab..b3468b0 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -129,7 +129,7 @@
}
final int partition = tid.getPartition();
List<IConnectorDescriptor> inputs = ac.getActivityInputMap().get(aid);
- Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutor(), ncs,
+ Task task = new Task(joblet, taId, han.getClass().getName(), ncs.getExecutorService(), ncs,
createInputChannels(td, inputs));
IOperatorNodePushable operator = han.createPushRuntime(task, rdp, partition, td.getPartitionCount());
@@ -201,7 +201,7 @@
td.getInputPartitionCounts()[i], td.getPartitionCount());
if (cPolicy.materializeOnReceiveSide()) {
return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
- task.getTaskAttemptId(), ncs.getExecutor());
+ task.getTaskAttemptId(), ncs.getExecutorService());
} else {
return collector;
}
@@ -217,7 +217,7 @@
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
- conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+ conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutorService());
}
};
} else {
@@ -225,7 +225,7 @@
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
- jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
+ jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutorService());
}
};
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
new file mode 100644
index 0000000..ed7c08d
--- /dev/null
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.rewriting;
+
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.tests.integration.AbstractIntegrationTest;
+import org.junit.Test;
+
+public class SuperActivityRewritingTest extends AbstractIntegrationTest {
+
+ @Test
+ public void testScanUnion() throws Exception {
+ JobSpecification spec = new JobSpecification();
+
+ DummySourceOperatorDescriptor ets1 = new DummySourceOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ets1, NC1_ID);
+
+ DummySourceOperatorDescriptor ets2 = new DummySourceOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ets2, NC1_ID);
+
+ DummySourceOperatorDescriptor ets3 = new DummySourceOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ets3, NC1_ID);
+
+ ThreadCountingOperatorDescriptor tc = new ThreadCountingOperatorDescriptor(spec, 3);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, tc, NC1_ID);
+
+ spec.connect(new OneToOneConnectorDescriptor(spec), ets1, 0, tc, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), ets2, 0, tc, 1);
+ spec.connect(new OneToOneConnectorDescriptor(spec), ets3, 0, tc, 2);
+ spec.addRoot(tc);
+ runTest(spec);
+ }
+
+}
+
+class DummySourceOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public DummySourceOperatorDescriptor(JobSpecification spec) {
+ super(spec, 0, 1);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+ @Override
+ public void initialize() throws HyracksDataException {
+ try {
+ writer.open();
+ writer.close();
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ };
+ }
+
+}
+
+class ThreadCountingOperatorDescriptor extends AbstractOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public ThreadCountingOperatorDescriptor(IOperatorDescriptorRegistry spec, int inputArity) {
+ super(spec, inputArity, 0);
+ }
+
+ @Override
+ public void contributeActivities(IActivityGraphBuilder builder) {
+ ThreadCountingActivityNode tca = new ThreadCountingActivityNode(new ActivityId(getOperatorId(), 0));
+ builder.addActivity(this, tca);
+ for (int i = 0; i < inputArity; ++i) {
+ builder.addSourceEdge(i, tca, i);
+ }
+ }
+
+ private class ThreadCountingActivityNode extends AbstractActivityNode {
+ private static final long serialVersionUID = 1L;
+
+ public ThreadCountingActivityNode(ActivityId id) {
+ super(id);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
+ throws HyracksDataException {
+ return new IOperatorNodePushable() {
+ private Set<Long> threads = new HashSet<Long>();
+
+ @Override
+ public void initialize() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void deinitialize() throws HyracksDataException {
+ if (threads.size() != inputArity) {
+ throw new HyracksDataException("The number of worker threads is not as expected");
+ }
+ }
+
+ @Override
+ public int getInputArity() {
+ return inputArity;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc)
+ throws HyracksDataException {
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public IFrameWriter getInputFrameWriter(int index) {
+ return new IFrameWriter() {
+ @Override
+ public void open() throws HyracksDataException {
+ synchronized (threads) {
+ threads.add(Thread.currentThread().getId());
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+ };
+ }
+
+ @Override
+ public String getDisplayName() {
+ return "Thread-Counting-Activity";
+ }
+
+ };
+ }
+ }
+
+}
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
index fb036d8..d8cf599 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestTaskContext.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.test.support;
import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -59,7 +60,7 @@
@Override
public ByteBuffer reallocateFrame(ByteBuffer tobeDeallocate, int newSizeInBytes, boolean copyOldData)
throws HyracksDataException {
- return jobletContext.reallocateFrame(tobeDeallocate,newSizeInBytes, copyOldData);
+ return jobletContext.reallocateFrame(tobeDeallocate, newSizeInBytes, copyOldData);
}
@@ -133,4 +134,9 @@
// TODO Auto-generated method stub
}
+
+ @Override
+ public ExecutorService getExecutorService() {
+ return null;
+ }
}