merged hyracks_asterix_stabilization r1702:1723
git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_lsm_tree@1725 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
index f79a464..a8f2fda 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/channels/IInputChannel.java
@@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public interface IInputChannel {
@@ -29,7 +30,7 @@
public void recycleBuffer(ByteBuffer buffer);
- public void open() throws HyracksDataException;
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException;
public void close() throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
index 31c3d36..791f312 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/client/impl/JobSpecificationActivityClusterGraphGeneratorFactory.java
@@ -48,6 +48,7 @@
ActivityClusterGraphBuilder acgb = new ActivityClusterGraphBuilder();
final ActivityClusterGraph acg = acgb.inferActivityClusters(jobId, jag);
+ acg.setFrameSize(spec.getFrameSize());
acg.setMaxReattempts(spec.getMaxReattempts());
acg.setJobletEventListenerFactory(spec.getJobletEventListenerFactory());
acg.setGlobalJobDataFactory(spec.getGlobalJobDataFactory());
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
index 62112f5..9cb8f10 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksCommonContext.java
@@ -19,9 +19,9 @@
import edu.uci.ics.hyracks.api.io.IIOManager;
public interface IHyracksCommonContext {
- public ByteBuffer allocateFrame();
-
public int getFrameSize();
public IIOManager getIOManager();
+
+ public ByteBuffer allocateFrame();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
index 2999648..fad4300 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksJobletContext.java
@@ -20,7 +20,7 @@
import edu.uci.ics.hyracks.api.job.profiling.counters.ICounterContext;
import edu.uci.ics.hyracks.api.resources.IDeallocatableRegistry;
-public interface IHyracksJobletContext extends IHyracksCommonContext, IWorkspaceFileFactory, IDeallocatableRegistry {
+public interface IHyracksJobletContext extends IWorkspaceFileFactory, IDeallocatableRegistry {
public INCApplicationContext getApplicationContext();
public JobId getJobId();
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
index 2ad2e7f..a94c6de 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/context/IHyracksRootContext.java
@@ -14,5 +14,8 @@
*/
package edu.uci.ics.hyracks.api.context;
-public interface IHyracksRootContext extends IHyracksCommonContext {
+import edu.uci.ics.hyracks.api.io.IIOManager;
+
+public interface IHyracksRootContext {
+ public IIOManager getIOManager();
}
\ No newline at end of file
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
index 7285956..dd68747 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/ActivityClusterGraph.java
@@ -37,6 +37,8 @@
private final Map<ConnectorDescriptorId, ActivityCluster> connectorMap;
+ private int frameSize;
+
private int maxReattempts;
private IJobletEventListenerFactory jobletEventListenerFactory;
@@ -48,6 +50,7 @@
activityClusterMap = new HashMap<ActivityClusterId, ActivityCluster>();
activityMap = new HashMap<ActivityId, ActivityCluster>();
connectorMap = new HashMap<ConnectorDescriptorId, ActivityCluster>();
+ frameSize = 32768;
}
public Map<ActivityId, ActivityCluster> getActivityMap() {
@@ -79,6 +82,14 @@
return version;
}
+ public void setFrameSize(int frameSize) {
+ this.frameSize = frameSize;
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
public void setMaxReattempts(int maxReattempts) {
this.maxReattempts = maxReattempts;
}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
index 84ebedb..cf2eec2 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/job/JobSpecification.java
@@ -56,6 +56,8 @@
private IConnectorPolicyAssignmentPolicy connectorPolicyAssignmentPolicy;
+ private int frameSize;
+
private int maxReattempts;
private IJobletEventListenerFactory jobletEventListenerFactory;
@@ -77,6 +79,7 @@
userConstraints = new HashSet<Constraint>();
operatorIdCounter = 0;
connectorIdCounter = 0;
+ frameSize = 32768;
maxReattempts = 2;
}
@@ -210,6 +213,14 @@
this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy;
}
+ public void setFrameSize(int frameSize) {
+ this.frameSize = frameSize;
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
public void setMaxReattempts(int maxReattempts) {
this.maxReattempts = maxReattempts;
}
diff --git a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
index 02eb891..c3dfad0 100644
--- a/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
+++ b/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/partitions/IPartition.java
@@ -15,9 +15,12 @@
package edu.uci.ics.hyracks.api.partitions;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.resources.IDeallocatable;
public interface IPartition extends IDeallocatable {
+ public IHyracksTaskContext getTaskContext();
+
public void writeTo(IFrameWriter writer);
public boolean isReusable();
diff --git a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 574f552..167eb4b 100644
--- a/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -37,9 +37,6 @@
@Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
public String dataIPAddress;
- @Option(name = "-frame-size", usage = "Frame Size to use for data communication (default: 32768)")
- public int frameSize = 32768;
-
@Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
public String ioDevices = System.getProperty("java.io.tmpdir");
@@ -55,6 +52,9 @@
@Option(name = "-net-thread-count", usage = "Number of threads to use for Network I/O (default: 1)")
public int nNetThreads = 1;
+ @Option(name = "-max-memory", usage = "Maximum memory usable at this Node Controller in bytes (default: -1 auto)")
+ public int maxMemory = -1;
+
public void toCommandLine(List<String> cList) {
cList.add("-cc-host");
cList.add(ccHost);
@@ -66,8 +66,6 @@
cList.add(nodeId);
cList.add("-data-ip-address");
cList.add(dataIPAddress);
- cList.add("-frame-size");
- cList.add(String.valueOf(frameSize));
cList.add("-iodevices");
cList.add(ioDevices);
cList.add("-dcache-client-servers");
@@ -80,5 +78,7 @@
cList.add(dcacheClientPath);
cList.add("-net-thread-count");
cList.add(String.valueOf(nNetThreads));
+ cList.add("-max-memory");
+ cList.add(String.valueOf(maxMemory));
}
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
index 63a45db..3855b4d 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Joblet.java
@@ -80,6 +80,8 @@
private final IJobletEventListener jobletEventListener;
+ private final int frameSize;
+
private JobStatus cleanupStatus;
private boolean cleanupPending;
@@ -89,6 +91,7 @@
this.nodeController = nodeController;
this.appCtx = appCtx;
this.jobId = jobId;
+ this.frameSize = acg.getFrameSize();
this.acg = acg;
partitionRequestMap = new HashMap<PartitionId, IPartitionCollector>();
env = new OperatorEnvironmentImpl(nodeController.getId());
@@ -201,18 +204,15 @@
});
}
- @Override
- public ByteBuffer allocateFrame() {
- return appCtx.getRootContext().allocateFrame();
+ ByteBuffer allocateFrame() {
+ return ByteBuffer.allocate(getFrameSize());
}
- @Override
- public int getFrameSize() {
- return appCtx.getRootContext().getFrameSize();
+ int getFrameSize() {
+ return frameSize;
}
- @Override
- public IIOManager getIOManager() {
+ IIOManager getIOManager() {
return appCtx.getRootContext().getIOManager();
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index ecdd839..58a173c 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -129,12 +129,12 @@
NodeControllerIPCI ipci = new NodeControllerIPCI();
ipc = new IPCSystem(new InetSocketAddress(ncConfig.clusterNetIPAddress, 0), ipci,
new CCNCFunctions.SerializerDeserializer());
- this.ctx = new RootHyracksContext(ncConfig.frameSize, new IOManager(getDevices(ncConfig.ioDevices), executor));
+ this.ctx = new RootHyracksContext(new IOManager(getDevices(ncConfig.ioDevices), executor));
if (id == null) {
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(ctx, getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
+ netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
@@ -260,7 +260,7 @@
return executor;
}
- public NCConfig getConfiguration() throws Exception {
+ public NCConfig getConfiguration() {
return ncConfig;
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
index 257e2a8..1d5af84 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -23,7 +23,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
@@ -33,8 +33,6 @@
public class NetworkInputChannel implements IInputChannel {
private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
- private IHyracksRootContext ctx;
-
private final NetworkManager netManager;
private final SocketAddress remoteAddress;
@@ -51,9 +49,8 @@
private Object attachment;
- public NetworkInputChannel(IHyracksRootContext ctx, NetworkManager netManager, SocketAddress remoteAddress,
- PartitionId partitionId, int nBuffers) {
- this.ctx = ctx;
+ public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
+ int nBuffers) {
this.netManager = netManager;
this.remoteAddress = remoteAddress;
this.partitionId = partitionId;
@@ -88,7 +85,7 @@
}
@Override
- public void open() throws HyracksDataException {
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
try {
ccb = netManager.connect(remoteAddress);
} catch (Exception e) {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index d328a74..b805595 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -23,12 +23,11 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.control.nc.partitions.IPartitionRequestListener;
+import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.exceptions.NetException;
import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
@@ -44,18 +43,14 @@
static final int INITIAL_MESSAGE_SIZE = 20;
- private final IHyracksRootContext ctx;
-
- private final IPartitionRequestListener partitionRequestListener;
+ private final PartitionManager partitionManager;
private final MuxDemux md;
private NetworkAddress networkAddress;
- public NetworkManager(IHyracksRootContext ctx, InetAddress inetAddress,
- IPartitionRequestListener partitionRequestListener, int nThreads) throws IOException {
- this.ctx = ctx;
- this.partitionRequestListener = partitionRequestListener;
+ public NetworkManager(InetAddress inetAddress, PartitionManager partitionManager, int nThreads) throws IOException {
+ this.partitionManager = partitionManager;
md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
MAX_CONNECTION_ATTEMPTS);
}
@@ -102,9 +97,9 @@
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Received initial partition request: " + pid + " on channel: " + ccb);
}
- noc = new NetworkOutputChannel(ctx, ccb, 5);
+ noc = new NetworkOutputChannel(ccb, 1);
try {
- partitionRequestListener.registerPartitionRequest(pid, noc);
+ partitionManager.registerPartitionRequest(pid, noc);
} catch (HyracksException e) {
noc.abort();
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
index 185768e..9024e18 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -19,7 +19,7 @@
import java.util.Deque;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
@@ -27,17 +27,23 @@
public class NetworkOutputChannel implements IFrameWriter {
private final ChannelControlBlock ccb;
+ private final int nBuffers;
+
private final Deque<ByteBuffer> emptyStack;
private boolean aborted;
- public NetworkOutputChannel(IHyracksRootContext ctx, ChannelControlBlock ccb, int nBuffers) {
+ public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
this.ccb = ccb;
+ this.nBuffers = nBuffers;
emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ }
+
+ public void setTaskContext(IHyracksTaskContext ctx) {
for (int i = 0; i < nBuffers; ++i) {
emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
}
- ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
}
@Override
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/IPartitionRequestListener.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/IPartitionRequestListener.java
deleted file mode 100644
index 20ed49c..0000000
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/IPartitionRequestListener.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.partitions;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.partitions.PartitionId;
-
-public interface IPartitionRequestListener {
- public void registerPartitionRequest(PartitionId partitionId, IFrameWriter writer) throws HyracksException;
-}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
index e75fe1c..ae4fd2b 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -18,7 +18,7 @@
import java.util.concurrent.Executor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IFileHandle;
@@ -27,7 +27,7 @@
import edu.uci.ics.hyracks.control.nc.io.IOManager;
public class MaterializedPartition implements IPartition {
- private final IHyracksRootContext ctx;
+ private final IHyracksTaskContext ctx;
private final FileReference partitionFile;
@@ -35,7 +35,7 @@
private final IOManager ioManager;
- public MaterializedPartition(IHyracksRootContext ctx, FileReference partitionFile, Executor executor,
+ public MaterializedPartition(IHyracksTaskContext ctx, FileReference partitionFile, Executor executor,
IOManager ioManager) {
this.ctx = ctx;
this.partitionFile = partitionFile;
@@ -44,6 +44,11 @@
}
@Override
+ public IHyracksTaskContext getTaskContext() {
+ return ctx;
+ }
+
+ @Override
public void deallocate() {
partitionFile.delete();
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index 0ea3124..16e31f7 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -21,12 +21,14 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class MaterializedPartitionInputChannel implements IInputChannel {
+ private final int nBuffers;
+
private final Queue<ByteBuffer> emptyQueue;
private final Queue<ByteBuffer> fullQueue;
@@ -41,12 +43,9 @@
private Object attachment;
- public MaterializedPartitionInputChannel(IHyracksRootContext ctx, int nBuffers, PartitionId pid,
- PartitionManager manager) {
+ public MaterializedPartitionInputChannel(int nBuffers, PartitionId pid, PartitionManager manager) {
+ this.nBuffers = nBuffers;
this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
- for (int i = 0; i < nBuffers; ++i) {
- emptyQueue.add(ctx.allocateFrame());
- }
fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
this.pid = pid;
this.manager = manager;
@@ -83,7 +82,10 @@
}
@Override
- public void open() throws HyracksDataException {
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ for (int i = 0; i < nBuffers; ++i) {
+ emptyQueue.add(ctx.allocateFrame());
+ }
IPartition partition = manager.getPartition(pid);
partition.writeTo(writer);
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
index 1f19cbe..7bd0eb1 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -20,7 +20,7 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -33,7 +33,7 @@
public class MaterializedPartitionWriter implements IFrameWriter {
private static final Logger LOGGER = Logger.getLogger(MaterializedPartitionWriter.class.getName());
- private final IHyracksRootContext ctx;
+ private final IHyracksTaskContext ctx;
private final PartitionManager manager;
@@ -51,7 +51,7 @@
private boolean failed;
- public MaterializedPartitionWriter(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+ public MaterializedPartitionWriter(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
TaskAttemptId taId, Executor executor) {
this.ctx = ctx;
this.manager = manager;
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 08f6ea4..62320c5 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -20,7 +20,7 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -34,7 +34,7 @@
public class MaterializingPipelinedPartition implements IFrameWriter, IPartition {
private static final Logger LOGGER = Logger.getLogger(MaterializingPipelinedPartition.class.getName());
- private final IHyracksRootContext ctx;
+ private final IHyracksTaskContext ctx;
private final Executor executor;
@@ -56,7 +56,7 @@
private boolean failed;
- public MaterializingPipelinedPartition(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+ public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid,
TaskAttemptId taId, Executor executor) {
this.ctx = ctx;
this.executor = executor;
@@ -67,6 +67,11 @@
}
@Override
+ public IHyracksTaskContext getTaskContext() {
+ return ctx;
+ }
+
+ @Override
public void deallocate() {
fRef.delete();
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index ce40fe6..45c091a 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -21,7 +21,6 @@
import java.util.List;
import java.util.Map;
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
@@ -34,9 +33,10 @@
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
-public class PartitionManager implements IPartitionRequestListener {
+public class PartitionManager {
private final NodeControllerService ncs;
private final Map<PartitionId, List<IPartition>> partitionMap;
@@ -93,12 +93,12 @@
}
}
- @Override
- public synchronized void registerPartitionRequest(PartitionId partitionId, IFrameWriter writer)
+ public synchronized void registerPartitionRequest(PartitionId partitionId, NetworkOutputChannel writer)
throws HyracksException {
List<IPartition> pList = partitionMap.get(partitionId);
if (pList != null && !pList.isEmpty()) {
IPartition partition = pList.get(0);
+ writer.setTaskContext(partition.getTaskContext());
partition.writeTo(writer);
if (!partition.isReusable()) {
partitionMap.remove(partitionId);
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
index 09dca9e..e427cf3 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PipelinedPartition.java
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.IPartition;
@@ -24,6 +25,8 @@
import edu.uci.ics.hyracks.control.common.job.PartitionState;
public class PipelinedPartition implements IFrameWriter, IPartition {
+ private final IHyracksTaskContext ctx;
+
private final PartitionManager manager;
private final PartitionId pid;
@@ -36,13 +39,19 @@
private boolean failed;
- public PipelinedPartition(PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
+ public PipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid, TaskAttemptId taId) {
+ this.ctx = ctx;
this.manager = manager;
this.pid = pid;
this.taId = taId;
}
@Override
+ public IHyracksTaskContext getTaskContext() {
+ return ctx;
+ }
+
+ @Override
public boolean isReusable() {
return false;
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
index f314e09..b1e58fc 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/ReceiveSideMaterializingCollector.java
@@ -26,7 +26,7 @@
import edu.uci.ics.hyracks.api.comm.IFrameReader;
import edu.uci.ics.hyracks.api.comm.IPartitionCollector;
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -35,7 +35,7 @@
import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class ReceiveSideMaterializingCollector implements IPartitionCollector {
- private final IHyracksRootContext ctx;
+ private final IHyracksTaskContext ctx;
private PartitionManager manager;
@@ -45,7 +45,7 @@
private final Executor executor;
- public ReceiveSideMaterializingCollector(IHyracksRootContext ctx, PartitionManager manager,
+ public ReceiveSideMaterializingCollector(IHyracksTaskContext ctx, PartitionManager manager,
IPartitionCollector collector, TaskAttemptId taId, Executor executor) {
this.ctx = ctx;
this.manager = manager;
@@ -123,7 +123,7 @@
IInputChannel channel = pc.getInputChannel();
try {
channel.registerMonitor(this);
- channel.open();
+ channel.open(ctx);
mpw.open();
while (true) {
if (nAvailableFrames.get() > 0) {
@@ -150,7 +150,7 @@
mpw.close();
channel.close();
delegate.addPartitions(Collections.singleton(new PartitionChannel(pid,
- new MaterializedPartitionInputChannel(ctx, 1, pid, manager))));
+ new MaterializedPartitionInputChannel(1, pid, manager))));
} catch (HyracksException e) {
}
}
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
index 045fa52..9e42277 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/runtime/RootHyracksContext.java
@@ -14,33 +14,18 @@
*/
package edu.uci.ics.hyracks.control.nc.runtime;
-import java.nio.ByteBuffer;
-
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.io.IIOManager;
public class RootHyracksContext implements IHyracksRootContext {
- private final int frameSize;
-
private final IIOManager ioManager;
- public RootHyracksContext(int frameSize, IIOManager ioManager) {
- this.frameSize = frameSize;
+ public RootHyracksContext(IIOManager ioManager) {
this.ioManager = ioManager;
}
@Override
- public int getFrameSize() {
- return frameSize;
- }
-
- @Override
public IIOManager getIOManager() {
return ioManager;
}
-
- @Override
- public ByteBuffer allocateFrame() {
- return ByteBuffer.allocate(frameSize);
- }
}
\ No newline at end of file
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index 2cf43da5b..bb9669d 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -46,9 +46,9 @@
Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
Joblet ji = jobletMap.get(pid.getJobId());
if (ji != null) {
- PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getRootContext(),
- ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress
- .getIpAddress()), networkAddress.getPort()), pid, 5));
+ PartitionChannel channel = new PartitionChannel(pid, new NetworkInputChannel(ncs.getNetworkManager(),
+ new InetSocketAddress(InetAddress.getByAddress(networkAddress.getIpAddress()),
+ networkAddress.getPort()), pid, 5));
ji.reportPartitionAvailability(channel);
}
} catch (Exception e) {
diff --git a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
index 44be4b3..0c0fa3d 100644
--- a/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
+++ b/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/StartTasksWork.java
@@ -187,34 +187,32 @@
IPartitionCollector collector = conn.createPartitionCollector(task, recordDesc, partition,
td.getInputPartitionCounts()[i], td.getPartitionCount());
if (cPolicy.materializeOnReceiveSide()) {
- return new ReceiveSideMaterializingCollector(ncs.getRootContext(), ncs.getPartitionManager(), collector,
+ return new ReceiveSideMaterializingCollector(task, ncs.getPartitionManager(), collector,
task.getTaskAttemptId(), ncs.getExecutor());
} else {
return collector;
}
}
- private IPartitionWriterFactory createPartitionWriterFactory(IHyracksTaskContext ctx, IConnectorPolicy cPolicy,
- final JobId jobId, final IConnectorDescriptor conn, final int senderIndex, final TaskAttemptId taId,
- EnumSet<JobFlag> flags) {
+ private IPartitionWriterFactory createPartitionWriterFactory(final IHyracksTaskContext ctx,
+ IConnectorPolicy cPolicy, final JobId jobId, final IConnectorDescriptor conn, final int senderIndex,
+ final TaskAttemptId taId, EnumSet<JobFlag> flags) {
IPartitionWriterFactory factory;
if (cPolicy.materializeOnSendSide()) {
if (cPolicy.consumerWaitsForProducerToFinish()) {
factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new MaterializedPartitionWriter(ncs.getRootContext(), ncs.getPartitionManager(),
- new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
- ncs.getExecutor());
+ return new MaterializedPartitionWriter(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
+ conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
}
};
} else {
factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new MaterializingPipelinedPartition(ncs.getRootContext(), ncs.getPartitionManager(),
- new PartitionId(jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId,
- ncs.getExecutor());
+ return new MaterializingPipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(
+ jobId, conn.getConnectorId(), senderIndex, receiverIndex), taId, ncs.getExecutor());
}
};
}
@@ -222,7 +220,7 @@
factory = new IPartitionWriterFactory() {
@Override
public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
- return new PipelinedPartition(ncs.getPartitionManager(), new PartitionId(jobId,
+ return new PipelinedPartition(ctx, ncs.getPartitionManager(), new PartitionId(jobId,
conn.getConnectorId(), senderIndex, receiverIndex), taId);
}
};
diff --git a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
index 2ec4b47..b095332 100644
--- a/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
@@ -17,7 +17,7 @@
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOpenableDataWriter;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -26,7 +26,7 @@
private final IOpenableDataWriter<Object[]> writer;
private final FrameDeserializer frameDeserializer;
- public FrameDeserializingDataWriter(IHyracksRootContext ctx, IOpenableDataWriter<Object[]> writer,
+ public FrameDeserializingDataWriter(IHyracksTaskContext ctx, IOpenableDataWriter<Object[]> writer,
RecordDescriptor recordDescriptor) {
this.writer = writer;
this.frameDeserializer = new FrameDeserializer(ctx.getFrameSize(), recordDescriptor);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
index da5b0bc..23df4bb 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/collectors/PartitionCollector.java
@@ -51,7 +51,7 @@
PartitionId pid = pc.getPartitionId();
IInputChannel channel = pc.getInputChannel();
pa.addPartition(pid, channel);
- channel.open();
+ channel.open(ctx);
}
}
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
index d111954..0c569f2 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/BSTMemMgr.java
@@ -16,7 +16,7 @@
import java.nio.ByteBuffer;
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -33,7 +33,7 @@
*/
public class BSTMemMgr implements IMemoryManager {
- private final IHyracksCommonContext ctx;
+ private final IHyracksTaskContext ctx;
public static int frameSize;
private ByteBuffer[] frames;
@@ -50,7 +50,7 @@
private Slot[] parentRes;
private int lastFrame;
- public BSTMemMgr(IHyracksCommonContext ctx, int memSize) {
+ public BSTMemMgr(IHyracksTaskContext ctx, int memSize) {
this.ctx = ctx;
frameSize = ctx.getFrameSize();
convertBuffer = ByteBuffer.allocate(4);
diff --git a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
index 3de9742..680b98e 100644
--- a/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
+++ b/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/FrameSorter.java
@@ -19,7 +19,7 @@
import java.util.List;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputer;
@@ -31,7 +31,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
public class FrameSorter {
- private final IHyracksCommonContext ctx;
+ private final IHyracksTaskContext ctx;
private final int[] sortFields;
private final INormalizedKeyComputer nkc;
private final IBinaryComparator[] comparators;
@@ -48,7 +48,7 @@
private int[] tPointers;
private int tupleCount;
- public FrameSorter(IHyracksCommonContext ctx, int[] sortFields,
+ public FrameSorter(IHyracksTaskContext ctx, int[] sortFields,
INormalizedKeyComputerFactory firstKeyNormalizerFactory, IBinaryComparatorFactory[] comparatorFactories,
RecordDescriptor recordDescriptor) {
this.ctx = ctx;
diff --git a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java
index d65952a..c5f9c2f 100644
--- a/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java
+++ b/hyracks-storage-am-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/invertedindex/impls/InvertedIndex.java
@@ -448,11 +448,6 @@
private final int FRAME_SIZE = 32768;
@Override
- public ByteBuffer allocateFrame() {
- return ByteBuffer.allocate(FRAME_SIZE);
- }
-
- @Override
public int getFrameSize() {
return FRAME_SIZE;
}
@@ -461,6 +456,11 @@
public IIOManager getIOManager() {
return null;
}
+
+ @Override
+ public ByteBuffer allocateFrame() {
+ return ByteBuffer.allocate(FRAME_SIZE);
+ }
}
@Override
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
index f87e532..0f59d63 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/storage/am/config/AccessMethodTestsConfig.java
@@ -20,10 +20,10 @@
*/
public class AccessMethodTestsConfig {
// Test params for RTree, LSMRTree and LSMRTreeWithAntiMatterTuples.
- public static final int RTREE_NUM_TUPLES_TO_INSERT = 10000;
+ public static final int RTREE_NUM_TUPLES_TO_INSERT = 100;
public static final int RTREE_NUM_INSERT_ROUNDS = 2;
public static final int RTREE_NUM_DELETE_ROUNDS = 2;
- public static final int RTREE_MULTITHREAD_NUM_OPERATIONS = 10000;
+ public static final int RTREE_MULTITHREAD_NUM_OPERATIONS = 100;
public static final boolean RTREE_TEST_RSTAR_POLICY = true;
// Test params for LSMRTree and LSMRTreeWithAntiMatterTuples.
public static final int LSM_RTREE_BULKLOAD_ROUNDS = 5;
@@ -31,11 +31,11 @@
public static final boolean LSM_RTREE_TEST_RSTAR_POLICY = false;
// Test params for BTree, LSMBTree.
- public static final int BTREE_NUM_TUPLES_TO_INSERT = 10000;
+ public static final int BTREE_NUM_TUPLES_TO_INSERT = 100;
public static final int BTREE_NUM_INSERT_ROUNDS = 3;
public static final int BTREE_NUM_DELETE_ROUNDS = 3;
public static final int BTREE_NUM_UPDATE_ROUNDS = 3;
- public static final int BTREE_MULTITHREAD_NUM_OPERATIONS = 10000;
+ public static final int BTREE_MULTITHREAD_NUM_OPERATIONS = 100;
// Test params for LSMBTree only.
public static final int LSM_BTREE_BULKLOAD_ROUNDS = 5;
public static final int LSM_BTREE_MAX_TREES_TO_MERGE = 10;
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
index 7231f67..7612db9 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestJobletContext.java
@@ -29,27 +29,26 @@
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
public class TestJobletContext implements IHyracksJobletContext {
+ private final int frameSize;
private final INCApplicationContext appContext;
private JobId jobId;
private WorkspaceFileFactory fileFactory;
- public TestJobletContext(INCApplicationContext appContext, JobId jobId) throws HyracksException {
+ public TestJobletContext(int frameSize, INCApplicationContext appContext, JobId jobId) throws HyracksException {
+ this.frameSize = frameSize;
this.appContext = appContext;
this.jobId = jobId;
fileFactory = new WorkspaceFileFactory(this, (IOManager) getIOManager());
}
- @Override
public ByteBuffer allocateFrame() {
- return appContext.getRootContext().allocateFrame();
+ return ByteBuffer.allocate(frameSize);
}
- @Override
public int getFrameSize() {
- return appContext.getRootContext().getFrameSize();
+ return frameSize;
}
- @Override
public IIOManager getIOManager() {
return appContext.getRootContext().getIOManager();
}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
index 93409ef..e195036 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestRootContext.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.test.support;
import java.io.File;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
@@ -27,27 +26,15 @@
import edu.uci.ics.hyracks.control.nc.io.IOManager;
public class TestRootContext implements IHyracksRootContext {
- private int frameSize;
private IOManager ioManager;
- public TestRootContext(int frameSize) throws HyracksException {
- this.frameSize = frameSize;
+ public TestRootContext() throws HyracksException {
List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
devices.add(new IODeviceHandle(new File(System.getProperty("java.io.tmpdir")), "."));
ioManager = new IOManager(devices, Executors.newCachedThreadPool());
}
@Override
- public ByteBuffer allocateFrame() {
- return ByteBuffer.allocate(frameSize);
- }
-
- @Override
- public int getFrameSize() {
- return frameSize;
- }
-
- @Override
public IIOManager getIOManager() {
return ioManager;
}
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
index e18c0b8..c122b25 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestTaskContext.java
@@ -30,11 +30,11 @@
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
public class TestTaskContext implements IHyracksTaskContext {
- private final IHyracksJobletContext jobletContext;
+ private final TestJobletContext jobletContext;
private final TaskAttemptId taskId;
private WorkspaceFileFactory fileFactory;
- public TestTaskContext(IHyracksJobletContext jobletContext, TaskAttemptId taskId) throws HyracksException {
+ public TestTaskContext(TestJobletContext jobletContext, TaskAttemptId taskId) throws HyracksException {
this.jobletContext = jobletContext;
this.taskId = taskId;
fileFactory = new WorkspaceFileFactory(this, (IOManager) getIOManager());
diff --git a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
index 5d488c1..71da634 100644
--- a/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
+++ b/hyracks-test-support/src/main/java/edu/uci/ics/hyracks/test/support/TestUtils.java
@@ -15,7 +15,6 @@
package edu.uci.ics.hyracks.test.support;
import edu.uci.ics.hyracks.api.application.INCApplicationContext;
-import edu.uci.ics.hyracks.api.context.IHyracksJobletContext;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.ActivityId;
@@ -28,9 +27,9 @@
public class TestUtils {
public static IHyracksTaskContext create(int frameSize) {
try {
- IHyracksRootContext rootCtx = new TestRootContext(frameSize);
+ IHyracksRootContext rootCtx = new TestRootContext();
INCApplicationContext appCtx = new TestNCApplicationContext(rootCtx, null);
- IHyracksJobletContext jobletCtx = new TestJobletContext(appCtx, new JobId(0));
+ TestJobletContext jobletCtx = new TestJobletContext(frameSize, appCtx, new JobId(0));
TaskAttemptId tid = new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), 0), 0);
IHyracksTaskContext taskCtx = new TestTaskContext(jobletCtx, tid);
return taskCtx;