Reverting the merge of fullstack_hyracks_result_distribution branch until all the tests pass.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3033 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index f103f4a..9ecc083 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -15,8 +15,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
- <source>1.7</source>
- <target>1.7</target>
+ <source>1.6</source>
+ <target>1.6</target>
</configuration>
</plugin>
</plugins>
@@ -40,11 +40,6 @@
<artifactId>hyracks-net</artifactId>
<version>0.2.3-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>edu.uci.ics.hyracks</groupId>
- <artifactId>hyracks-comm</artifactId>
- <version>0.2.3-SNAPSHOT</version>
- </dependency>
</dependencies>
<reporting>
<plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 290c59c..0195143 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -45,7 +45,6 @@
import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.io.IODeviceHandle;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
@@ -62,9 +61,7 @@
import edu.uci.ics.hyracks.control.common.work.FutureValue;
import edu.uci.ics.hyracks.control.common.work.WorkQueue;
import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
-import edu.uci.ics.hyracks.control.nc.dataset.DatasetPartitionManager;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.net.DatasetNetworkManager;
import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
@@ -97,10 +94,6 @@
private final NetworkManager netManager;
- private final IDatasetPartitionManager datasetPartitionManager;
-
- private final DatasetNetworkManager datasetNetworkManager;
-
private final WorkQueue queue;
private final Timer timer;
@@ -147,11 +140,7 @@
throw new Exception("id not set");
}
partitionManager = new PartitionManager(this);
- netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
-
- datasetPartitionManager = new DatasetPartitionManager(this, executor);
- datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
- datasetPartitionManager, ncConfig.nNetThreads);
+ netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
queue = new WorkQueue();
jobletMap = new Hashtable<JobId, Joblet>();
@@ -216,7 +205,6 @@
LOGGER.log(Level.INFO, "Starting NodeControllerService");
ipc.start();
netManager.start();
- datasetNetworkManager.start();
IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -225,11 +213,10 @@
}
HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
- datasetNetworkManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean
- .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
- .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
- .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
- runtimeMXBean.getSystemProperties(), hbSchema));
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
+ .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
synchronized (this) {
while (registrationPending) {
@@ -260,10 +247,8 @@
LOGGER.log(Level.INFO, "Stopping NodeControllerService");
executor.shutdownNow();
partitionManager.close();
- datasetPartitionManager.close();
heartbeatTask.cancel();
netManager.stop();
- datasetNetworkManager.stop();
queue.stop();
LOGGER.log(Level.INFO, "Stopped NodeControllerService");
}
@@ -288,10 +273,6 @@
return netManager;
}
- public DatasetNetworkManager getDatasetNetworkManager() {
- return datasetNetworkManager;
- }
-
public PartitionManager getPartitionManager() {
return partitionManager;
}
@@ -316,7 +297,8 @@
return queue;
}
- private static InetAddress getIpAddress(String ipaddrStr) throws Exception {
+ private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+ String ipaddrStr = ncConfig.dataIPAddress;
ipaddrStr = ipaddrStr.trim();
Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
Matcher m = pattern.matcher(ipaddrStr);
@@ -373,12 +355,6 @@
hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
- MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
- hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
- hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
- hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
- hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
-
IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
@@ -483,10 +459,6 @@
}
}
- public IDatasetPartitionManager getDatasetPartitionManager() {
- return datasetPartitionManager;
- }
-
public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
ccs.sendApplicationMessageToCC(data, appName, nodeId);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 5a3e9dd..eba3ec9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -34,7 +34,6 @@
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
@@ -349,11 +348,6 @@
}
@Override
- public IDatasetPartitionManager getDatasetPartitionManager() {
- return ncs.getDatasetPartitionManager();
- }
-
- @Override
public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
this.ncs.sendApplicationMessageToCC(message, this.getJobletContext().getApplicationContext()
.getApplicationName(), nodeId);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
deleted file mode 100644
index 85f17b1..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.dataset;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
-import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
-
-public class DatasetPartitionManager implements IDatasetPartitionManager {
- private final NodeControllerService ncs;
-
- private final Executor executor;
-
- private final Map<JobId, DatasetPartitionWriter[]> partitionDatasetWriterMap;
-
- private final DefaultDeallocatableRegistry deallocatableRegistry;
-
- private final IWorkspaceFileFactory fileFactory;
-
- public DatasetPartitionManager(NodeControllerService ncs, Executor executor) {
- this.ncs = ncs;
- this.executor = executor;
- partitionDatasetWriterMap = new HashMap<JobId, DatasetPartitionWriter[]>();
- deallocatableRegistry = new DefaultDeallocatableRegistry();
- fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
- }
-
- @Override
- public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
- int partition, int nPartitions) throws HyracksException {
- DatasetPartitionWriter dpw = null;
- JobId jobId = ctx.getJobletContext().getJobId();
- try {
- ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
- nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, executor);
-
- DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
- if (writers == null) {
- writers = new DatasetPartitionWriter[nPartitions];
- partitionDatasetWriterMap.put(jobId, writers);
- }
- writers[partition] = dpw;
- } catch (Exception e) {
- throw new HyracksException(e);
- }
-
- return dpw;
- }
-
- @Override
- public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
- try {
- ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
- public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
- try {
- ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
- } catch (Exception e) {
- throw new HyracksException(e);
- }
- }
-
- @Override
- public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
- throws HyracksException {
- DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
- if (writers == null) {
- throw new HyracksException("Unknown JobId " + jobId);
- }
-
- DatasetPartitionWriter dpw = writers[partition];
- if (dpw == null) {
- throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
- }
-
- dpw.writeTo(writer);
- }
-
- @Override
- public IWorkspaceFileFactory getFileFactory() {
- return fileFactory;
- }
-
- @Override
- public void close() {
- deallocatableRegistry.close();
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
deleted file mode 100644
index 0f5895e..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.dataset;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.partitions.IPartition;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-
-public class DatasetPartitionWriter implements IFrameWriter, IPartition {
- private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
-
- private static final String FILE_PREFIX = "result_";
-
- private final IHyracksTaskContext ctx;
-
- private final IDatasetPartitionManager manager;
-
- private final JobId jobId;
-
- private final ResultSetId resultSetId;
-
- private final int partition;
-
- private final Executor executor;
-
- private final AtomicBoolean eos;
-
- private FileReference fRef;
-
- private IFileHandle handle;
-
- private long size;
-
- public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
- ResultSetId rsId, int partition, Executor executor) {
- this.ctx = ctx;
- this.manager = manager;
- this.jobId = jobId;
- this.resultSetId = rsId;
- this.partition = partition;
- this.executor = executor;
- eos = new AtomicBoolean(false);
- }
-
- @Override
- public void open() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("open(" + partition + ")");
- }
- fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(FILE_PREFIX + String.valueOf(partition));
- handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- size = 0;
- }
-
- @Override
- public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- size += ctx.getIOManager().syncWrite(handle, size, buffer);
- notifyAll();
- }
-
- @Override
- public void fail() throws HyracksDataException {
- try {
- manager.reportPartitionFailure(jobId, resultSetId, partition);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public synchronized void close() throws HyracksDataException {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("close(" + partition + ")");
- }
-
- try {
- eos.set(true);
- notifyAll();
- manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
- } catch (HyracksException e) {
- throw new HyracksDataException(e);
- }
- }
-
- @Override
- public IHyracksTaskContext getTaskContext() {
- return ctx;
- }
-
- private synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
- while (offset >= size && !eos.get()) {
- try {
- wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- return ctx.getIOManager().syncRead(handle, offset, buffer);
- }
-
- @Override
- public void writeTo(final IFrameWriter writer) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- NetworkOutputChannel channel = (NetworkOutputChannel) writer;
- channel.setTaskContext(ctx);
- try {
- channel.open();
- try {
- long offset = 0;
- ByteBuffer buffer = ctx.allocateFrame();
- while (true) {
- buffer.clear();
- long size = read(offset, buffer);
- if (size < 0) {
- break;
- } else if (size < buffer.capacity()) {
- throw new HyracksDataException("Premature end of file");
- }
- offset += size;
- buffer.flip();
- channel.nextFrame(buffer);
- }
- } finally {
- channel.close();
- ctx.getIOManager().close(handle);
- }
- } catch (HyracksDataException e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
-
- @Override
- public boolean isReusable() {
- return true;
- }
-
- @Override
- public void deallocate() {
-
- }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
deleted file mode 100644
index 5b8b333..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
-import edu.uci.ics.hyracks.net.exceptions.NetException;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-
-public class DatasetNetworkManager implements IChannelConnectionFactory {
- private static final Logger LOGGER = Logger.getLogger(DatasetNetworkManager.class.getName());
-
- private static final int MAX_CONNECTION_ATTEMPTS = 5;
-
- static final int INITIAL_MESSAGE_SIZE = 20;
-
- private final IDatasetPartitionManager partitionManager;
-
- private final MuxDemux md;
-
- private NetworkAddress networkAddress;
-
- public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads)
- throws IOException {
- this.partitionManager = partitionManager;
- md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
- MAX_CONNECTION_ATTEMPTS);
- }
-
- public void start() throws IOException {
- md.start();
- InetSocketAddress sockAddr = md.getLocalAddress();
- networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
- }
-
- public NetworkAddress getNetworkAddress() {
- return networkAddress;
- }
-
- public void stop() {
-
- }
-
- public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
- MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
- return mConn.openChannel();
- }
-
- private class ChannelOpenListener implements IChannelOpenListener {
- @Override
- public void channelOpened(ChannelControlBlock channel) {
- channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
- channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
- }
- }
-
- private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
- private final ChannelControlBlock ccb;
-
- private NetworkOutputChannel noc;
-
- public InitialBufferAcceptor(ChannelControlBlock ccb) {
- this.ccb = ccb;
- }
-
- @Override
- public void accept(ByteBuffer buffer) {
- JobId jobId = new JobId(buffer.getLong());
- int partition = buffer.getInt();
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
- + partition + " on channel: " + ccb);
- }
- noc = new NetworkOutputChannel(ccb, 1);
- try {
- partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
- } catch (HyracksException e) {
- noc.abort();
- }
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void error(int ecode) {
- if (noc != null) {
- noc.abort();
- }
- }
- }
-
- public MuxDemuxPerformanceCounters getPerformanceCounters() {
- return md.getPerformanceCounters();
- }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
new file mode 100644
index 0000000..1d5af84
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkInputChannel implements IInputChannel {
+ private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+
+ private final NetworkManager netManager;
+
+ private final SocketAddress remoteAddress;
+
+ private final PartitionId partitionId;
+
+ private final Queue<ByteBuffer> fullQueue;
+
+ private final int nBuffers;
+
+ private ChannelControlBlock ccb;
+
+ private IInputChannelMonitor monitor;
+
+ private Object attachment;
+
+ public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
+ int nBuffers) {
+ this.netManager = netManager;
+ this.remoteAddress = remoteAddress;
+ this.partitionId = partitionId;
+ fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+ this.nBuffers = nBuffers;
+ }
+
+ @Override
+ public void registerMonitor(IInputChannelMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ @Override
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+ @Override
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ @Override
+ public synchronized ByteBuffer getNextBuffer() {
+ return fullQueue.poll();
+ }
+
+ @Override
+ public void recycleBuffer(ByteBuffer buffer) {
+ buffer.clear();
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+ }
+
+ @Override
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+ try {
+ ccb = netManager.connect(remoteAddress);
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ for (int i = 0; i < nBuffers; ++i) {
+ ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+ }
+ ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+ writeBuffer.putLong(partitionId.getJobId().getId());
+ writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+ writeBuffer.putInt(partitionId.getSenderIndex());
+ writeBuffer.putInt(partitionId.getReceiverIndex());
+ writeBuffer.flip();
+ if (LOGGER.isLoggable(Level.FINE)) {
+ LOGGER.fine("Sending partition request: " + partitionId + " on channel: " + ccb);
+ }
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+
+ }
+
+ private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ fullQueue.add(buffer);
+ monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
+ }
+
+ @Override
+ public void close() {
+ monitor.notifyEndOfStream(NetworkInputChannel.this);
+ }
+
+ @Override
+ public void error(int ecode) {
+ monitor.notifyFailure(NetworkInputChannel.this);
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ // do nothing
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index c8e4e94..b805595 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -27,8 +27,6 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.exceptions.NetException;
@@ -38,7 +36,7 @@
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-public class NetworkManager implements IChannelConnectionFactory {
+public class NetworkManager {
private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
private static final int MAX_CONNECTION_ATTEMPTS = 5;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
new file mode 100644
index 0000000..9024e18
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkOutputChannel implements IFrameWriter {
+ private final ChannelControlBlock ccb;
+
+ private final int nBuffers;
+
+ private final Deque<ByteBuffer> emptyStack;
+
+ private boolean aborted;
+
+ public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
+ this.ccb = ccb;
+ this.nBuffers = nBuffers;
+ emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+ ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+ }
+
+ public void setTaskContext(IHyracksTaskContext ctx) {
+ for (int i = 0; i < nBuffers; ++i) {
+ emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
+ }
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ ByteBuffer destBuffer = null;
+ synchronized (this) {
+ while (true) {
+ if (aborted) {
+ throw new HyracksDataException("Connection has been aborted");
+ }
+ destBuffer = emptyStack.poll();
+ if (destBuffer != null) {
+ break;
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ buffer.position(0);
+ buffer.limit(destBuffer.capacity());
+ destBuffer.clear();
+ destBuffer.put(buffer);
+ destBuffer.flip();
+ ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ ccb.getWriteInterface().getFullBufferAcceptor().close();
+ }
+
+ void abort() {
+ ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+ synchronized (NetworkOutputChannel.this) {
+ aborted = true;
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+
+ private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+ @Override
+ public void accept(ByteBuffer buffer) {
+ synchronized (NetworkOutputChannel.this) {
+ emptyStack.push(buffer);
+ NetworkOutputChannel.this.notifyAll();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index ba6e6c3..16e31f7 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -21,7 +21,7 @@
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -82,7 +82,7 @@
}
@Override
- public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+ public void open(IHyracksTaskContext ctx) throws HyracksDataException {
for (int i = 0; i < nBuffers; ++i) {
emptyQueue.add(ctx.allocateFrame());
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index e532a0b..45c091a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -28,12 +28,12 @@
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.IPartition;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
import edu.uci.ics.hyracks.control.common.job.PartitionState;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
import edu.uci.ics.hyracks.control.nc.io.IOManager;
import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class PartitionManager {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index 7ed9d11..bb9669d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -22,10 +22,10 @@
import edu.uci.ics.hyracks.api.comm.PartitionChannel;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.comm.channels.NetworkInputChannel;
import edu.uci.ics.hyracks.control.common.work.AbstractWork;
import edu.uci.ics.hyracks.control.nc.Joblet;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
public class ReportPartitionAvailabilityWork extends AbstractWork {
private final NodeControllerService ncs;