Reverting the merge of fullstack_hyracks_result_distribution branch until all the tests pass.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_asterix_stabilization@3033 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-control/hyracks-control-cc/pom.xml b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
index 95598da..c7eedb3 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-cc/pom.xml
@@ -15,8 +15,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
index 82457fe..5a33891 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/ClusterControllerService.java
@@ -35,17 +35,12 @@
 import edu.uci.ics.hyracks.api.client.ClusterControllerInfo;
 import edu.uci.ics.hyracks.api.client.HyracksClientInterfaceFunctions;
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.context.ICCContext;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
 import edu.uci.ics.hyracks.api.topology.ClusterTopology;
 import edu.uci.ics.hyracks.api.topology.TopologyDefinitionParser;
 import edu.uci.ics.hyracks.control.cc.application.CCApplicationContext;
-import edu.uci.ics.hyracks.control.cc.dataset.DatasetDirectoryService;
 import edu.uci.ics.hyracks.control.cc.job.JobRun;
 import edu.uci.ics.hyracks.control.cc.web.WebServer;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationCreateWork;
@@ -53,23 +48,17 @@
 import edu.uci.ics.hyracks.control.cc.work.ApplicationMessageWork;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationStartWork;
 import edu.uci.ics.hyracks.control.cc.work.ApplicationStateChangeWork;
-import edu.uci.ics.hyracks.control.cc.work.GetDatasetDirectoryServiceInfoWork;
 import edu.uci.ics.hyracks.control.cc.work.GetIpAddressNodeNameMapWork;
 import edu.uci.ics.hyracks.control.cc.work.GetJobStatusWork;
 import edu.uci.ics.hyracks.control.cc.work.GetNodeControllersInfoWork;
-import edu.uci.ics.hyracks.control.cc.work.GetResultPartitionLocationsWork;
-import edu.uci.ics.hyracks.control.cc.work.GetResultStatusWork;
 import edu.uci.ics.hyracks.control.cc.work.JobStartWork;
 import edu.uci.ics.hyracks.control.cc.work.JobletCleanupNotificationWork;
 import edu.uci.ics.hyracks.control.cc.work.NodeHeartbeatWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterNodeWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionAvailibilityWork;
 import edu.uci.ics.hyracks.control.cc.work.RegisterPartitionRequestWork;
-import edu.uci.ics.hyracks.control.cc.work.RegisterResultPartitionLocationWork;
 import edu.uci.ics.hyracks.control.cc.work.RemoveDeadNodesWork;
 import edu.uci.ics.hyracks.control.cc.work.ReportProfilesWork;
-import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionFailureWork;
-import edu.uci.ics.hyracks.control.cc.work.ReportResultPartitionWriteCompletionWork;
 import edu.uci.ics.hyracks.control.cc.work.TaskCompleteWork;
 import edu.uci.ics.hyracks.control.cc.work.TaskFailureWork;
 import edu.uci.ics.hyracks.control.cc.work.UnregisterNodeWork;
@@ -126,8 +115,6 @@
 
     private final DeadNodeSweeper sweeper;
 
-    private final IDatasetDirectoryService datasetDirectoryService;
-
     private long jobCounter;
 
     public ClusterControllerService(final CCConfig ccConfig) throws Exception {
@@ -175,7 +162,6 @@
             }
         };
         sweeper = new DeadNodeSweeper();
-        datasetDirectoryService = new DatasetDirectoryService();
         jobCounter = 0;
     }
 
@@ -278,10 +264,6 @@
         return clusterIPC;
     }
 
-    public NetworkAddress getDatasetDirectoryServiceInfo() {
-        return new NetworkAddress(ccConfig.clientNetIpAddress.getBytes(), ccConfig.clientNetPort);
-    }
-
     private class DeadNodeSweeper extends TimerTask {
         @Override
         public void run() {
@@ -289,10 +271,6 @@
         }
     }
 
-    public IDatasetDirectoryService getDatasetDirectoryService() {
-        return datasetDirectoryService;
-    }
-
     private class HyracksClientInterfaceIPCI implements IIPCI {
         @Override
         public void deliverIncomingMessage(IIPCHandle handle, long mid, long rmid, Object payload, Exception exception) {
@@ -343,27 +321,6 @@
                     return;
                 }
 
-                case GET_DATASET_DIRECTORY_SERIVICE_INFO: {
-                    workQueue.schedule(new GetDatasetDirectoryServiceInfoWork(ClusterControllerService.this,
-                            new IPCResponder<NetworkAddress>(handle, mid)));
-                    return;
-                }
-
-                case GET_DATASET_RESULT_STATUS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultStatusFunction) fn;
-                    workQueue.schedule(new GetResultStatusWork(ClusterControllerService.this, gdrlf.getJobId(), gdrlf
-                            .getResultSetId(), new IPCResponder<Status>(handle, mid)));
-                    return;
-                }
-
-                case GET_DATASET_RESULT_LOCATIONS: {
-                    HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction gdrlf = (HyracksClientInterfaceFunctions.GetDatasetResultLocationsFunction) fn;
-                    workQueue.schedule(new GetResultPartitionLocationsWork(ClusterControllerService.this, gdrlf
-                            .getJobId(), gdrlf.getResultSetId(), gdrlf.getKnownRecords(),
-                            new IPCResponder<DatasetDirectoryRecord[]>(handle, mid)));
-                    return;
-                }
-
                 case WAIT_FOR_COMPLETION: {
                     HyracksClientInterfaceFunctions.WaitForCompletionFunction wfcf = (HyracksClientInterfaceFunctions.WaitForCompletionFunction) fn;
                     workQueue.schedule(new WaitForJobCompletionWork(ClusterControllerService.this, wfcf.getJobId(),
@@ -459,28 +416,6 @@
                     return;
                 }
 
-                case REGISTER_RESULT_PARTITION_LOCATION: {
-                    CCNCFunctions.RegisterResultPartitionLocationFunction rrplf = (CCNCFunctions.RegisterResultPartitionLocationFunction) fn;
-                    workQueue.schedule(new RegisterResultPartitionLocationWork(ClusterControllerService.this, rrplf
-                            .getJobId(), rrplf.getResultSetId(), rrplf.getOrderedResult(), rrplf.getPartition(), rrplf
-                            .getNPartitions(), rrplf.getNetworkAddress()));
-                    return;
-                }
-
-                case REPORT_RESULT_PARTITION_WRITE_COMPLETION: {
-                    CCNCFunctions.ReportResultPartitionWriteCompletionFunction rrplf = (CCNCFunctions.ReportResultPartitionWriteCompletionFunction) fn;
-                    workQueue.schedule(new ReportResultPartitionWriteCompletionWork(ClusterControllerService.this,
-                            rrplf.getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
-                    return;
-                }
-
-                case REPORT_RESULT_PARTITION_FAILURE: {
-                    CCNCFunctions.ReportResultPartitionFailureFunction rrplf = (CCNCFunctions.ReportResultPartitionFailureFunction) fn;
-                    workQueue.schedule(new ReportResultPartitionFailureWork(ClusterControllerService.this, rrplf
-                            .getJobId(), rrplf.getResultSetId(), rrplf.getPartition()));
-                    return;
-                }
-
                 case APPLICATION_STATE_CHANGE_RESPONSE: {
                     CCNCFunctions.ApplicationStateChangeResponseFunction astrf = (CCNCFunctions.ApplicationStateChangeResponseFunction) fn;
                     workQueue.schedule(new ApplicationStateChangeWork(ClusterControllerService.this, astrf));
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
index c96a319..c17acd0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/NodeControllerState.java
@@ -41,8 +41,6 @@
 
     private final NetworkAddress dataPort;
 
-    private final NetworkAddress datasetPort;
-
     private final Set<JobId> activeJobIds;
 
     private final String osName;
@@ -109,14 +107,6 @@
 
     private final long[] netSignalingBytesWritten;
 
-    private final long[] datasetNetPayloadBytesRead;
-
-    private final long[] datasetNetPayloadBytesWritten;
-
-    private final long[] datasetNetSignalingBytesRead;
-
-    private final long[] datasetNetSignalingBytesWritten;
-
     private final long[] ipcMessagesSent;
 
     private final long[] ipcMessageBytesSent;
@@ -133,7 +123,6 @@
         this.nodeController = nodeController;
         ncConfig = reg.getNCConfig();
         dataPort = reg.getDataPort();
-        datasetPort = reg.getDatasetPort();
         activeJobIds = new HashSet<JobId>();
 
         osName = reg.getOSName();
@@ -175,10 +164,6 @@
         netPayloadBytesWritten = new long[RRD_SIZE];
         netSignalingBytesRead = new long[RRD_SIZE];
         netSignalingBytesWritten = new long[RRD_SIZE];
-        datasetNetPayloadBytesRead = new long[RRD_SIZE];
-        datasetNetPayloadBytesWritten = new long[RRD_SIZE];
-        datasetNetSignalingBytesRead = new long[RRD_SIZE];
-        datasetNetSignalingBytesWritten = new long[RRD_SIZE];
         ipcMessagesSent = new long[RRD_SIZE];
         ipcMessageBytesSent = new long[RRD_SIZE];
         ipcMessagesReceived = new long[RRD_SIZE];
@@ -211,10 +196,6 @@
         netPayloadBytesWritten[rrdPtr] = hbData.netPayloadBytesWritten;
         netSignalingBytesRead[rrdPtr] = hbData.netSignalingBytesRead;
         netSignalingBytesWritten[rrdPtr] = hbData.netSignalingBytesWritten;
-        datasetNetPayloadBytesRead[rrdPtr] = hbData.datasetNetPayloadBytesRead;
-        datasetNetPayloadBytesWritten[rrdPtr] = hbData.datasetNetPayloadBytesWritten;
-        datasetNetSignalingBytesRead[rrdPtr] = hbData.datasetNetSignalingBytesRead;
-        datasetNetSignalingBytesWritten[rrdPtr] = hbData.datasetNetSignalingBytesWritten;
         ipcMessagesSent[rrdPtr] = hbData.ipcMessagesSent;
         ipcMessageBytesSent[rrdPtr] = hbData.ipcMessageBytesSent;
         ipcMessagesReceived[rrdPtr] = hbData.ipcMessagesReceived;
@@ -246,10 +227,6 @@
         return dataPort;
     }
 
-    public NetworkAddress getDatasetPort() {
-        return datasetPort;
-    }
-
     public JSONObject toSummaryJSON() throws JSONException {
         JSONObject o = new JSONObject();
         o.put("node-id", ncConfig.nodeId);
@@ -294,10 +271,6 @@
         o.put("net-payload-bytes-written", netPayloadBytesWritten);
         o.put("net-signaling-bytes-read", netSignalingBytesRead);
         o.put("net-signaling-bytes-written", netSignalingBytesWritten);
-        o.put("dataset-net-payload-bytes-read", datasetNetPayloadBytesRead);
-        o.put("dataset-net-payload-bytes-written", datasetNetPayloadBytesWritten);
-        o.put("dataset-net-signaling-bytes-read", datasetNetSignalingBytesRead);
-        o.put("dataset-net-signaling-bytes-written", datasetNetSignalingBytesWritten);
         o.put("ipc-messages-sent", ipcMessagesSent);
         o.put("ipc-message-bytes-sent", ipcMessageBytesSent);
         o.put("ipc-messages-received", ipcMessagesReceived);
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
deleted file mode 100644
index 13d0c30..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.dataset;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-
-/**
- * TODO(madhusudancs): The potential perils of this global dataset directory service implementation is that, the jobs
- * location information is never evicted from the memory and the memory usage grows as the number of jobs in the system
- * grows. What we should possibly do is, add an API call for the client to say that it received everything it has to for
- * the job (after it receives all the results) completely. Then we can just get rid of the location information for that
- * job.
- */
-public class DatasetDirectoryService implements IDatasetDirectoryService {
-    private final Map<JobId, Map<ResultSetId, ResultSetMetaData>> jobResultLocationsMap;
-
-    public DatasetDirectoryService() {
-        jobResultLocationsMap = new HashMap<JobId, Map<ResultSetId, ResultSetMetaData>>();
-    }
-
-    @Override
-    public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult,
-            int partition, int nPartitions, NetworkAddress networkAddress) {
-        Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
-        if (rsMap == null) {
-            rsMap = new HashMap<ResultSetId, ResultSetMetaData>();
-            jobResultLocationsMap.put(jobId, rsMap);
-        }
-
-        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
-        if (resultSetMetaData == null) {
-            resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]);
-            rsMap.put(rsId, resultSetMetaData);
-        }
-
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-        if (records[partition] == null) {
-            records[partition] = new DatasetDirectoryRecord();
-        }
-        records[partition].setNetworkAddress(networkAddress);
-        records[partition].start();
-        notifyAll();
-    }
-
-    @Override
-    public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
-        ddr.writeEOS();
-    }
-
-    @Override
-    public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) {
-        DatasetDirectoryRecord ddr = getDatasetDirectoryRecord(jobId, rsId, partition);
-        ddr.fail();
-    }
-
-    @Override
-    public synchronized Status getResultStatus(JobId jobId, ResultSetId rsId) throws HyracksDataException {
-        Map<ResultSetId, ResultSetMetaData> rsMap;
-        while ((rsMap = jobResultLocationsMap.get(jobId)) == null) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-
-        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
-        if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
-            throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
-        }
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-
-        ArrayList<Status> statuses = new ArrayList<Status>(records.length);
-        for (int i = 0; i < records.length; i++) {
-            statuses.add(records[i].getStatus());
-        }
-
-        // Default status is idle
-        Status status = Status.IDLE;
-        if (statuses.contains(Status.FAILED)) {
-            // Even if there is at least one failed entry we should return failed status.
-            return Status.FAILED;
-        } else if (statuses.contains(Status.RUNNING)) {
-            // If there are not failed entry and if there is at least one running entry we should return running status.
-            return Status.RUNNING;
-        } else {
-            // If each and every partition has reported success do we report success as the status.
-            int successCount = 0;
-            for (int i = 0; i < statuses.size(); i++) {
-                if (statuses.get(i) == Status.SUCCESS) {
-                    successCount++;
-                }
-            }
-            if (successCount == statuses.size()) {
-                return Status.SUCCESS;
-            }
-        }
-        return status;
-    }
-
-    @Override
-    public synchronized DatasetDirectoryRecord[] getResultPartitionLocations(JobId jobId, ResultSetId rsId,
-            DatasetDirectoryRecord[] knownRecords) throws HyracksDataException {
-        DatasetDirectoryRecord[] newRecords;
-        while ((newRecords = updatedRecords(jobId, rsId, knownRecords)) == null) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-        return newRecords;
-    }
-
-    public DatasetDirectoryRecord getDatasetDirectoryRecord(JobId jobId, ResultSetId rsId, int partition) {
-        Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
-        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-        return records[partition];
-    }
-
-    /**
-     * Compares the records already known by the client for the given job's result set id with the records that the
-     * dataset directory service knows and if there are any newly discovered records returns a whole array with the
-     * new records filled in.
-     * This method has a very convoluted logic. Here is the explanation of how it works.
-     * If the ordering constraint has to be enforced, the method obtains the first null record in the known records in
-     * the order of the partitions. It always traverses the array in the first to last order!
-     * If known records array or the first element in that array is null in the but the record for that partition now
-     * known to the directory service, the method fills in that record in the array and returns the array back.
-     * However, if the first known null record is not a first element in the array, by induction, all the previous
-     * known records should be known already be known to client and none of the records for the partitions ahead is
-     * known by the client yet. So, we check if the client has reached the end of stream for the partition corresponding
-     * to the record before the first known null record, i.e. the last known non-null record. If not, we just return
-     * null because we cannot expose any new locations until the client reaches end of stream for the last known record.
-     * If the client has reached the end of stream record for the last known non-null record, we check if the next record
-     * is discovered by the dataset directory service and if so, we fill it in the records array and return it back or
-     * send null otherwise.
-     * If the ordering is not required, we are free to return any newly discovered records back, so we just check if
-     * arrays are equal and if they are not we send the entire new updated array.
-     * 
-     * @param jobId
-     *            - Id of the job for which the directory records should be retrieved.
-     * @param rsId
-     *            - Id of the result set for which the directory records should be retrieved.
-     * @param knownRecords
-     *            - An array of directory records that the client is already aware of.
-     * @return
-     *         - Returns null if there aren't any newly discovered partitions enforcing the ordering constraint
-     * @throws HyracksDataException
-     *             TODO(madhusudancs): Think about caching (and still be stateless) instead of this ugly O(n) iterations for
-     *             every check. This already looks very expensive.
-     */
-    private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords)
-            throws HyracksDataException {
-        Map<ResultSetId, ResultSetMetaData> rsMap = jobResultLocationsMap.get(jobId);
-        if (rsMap == null) {
-            return null;
-        }
-
-        ResultSetMetaData resultSetMetaData = rsMap.get(rsId);
-        if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) {
-            throw new HyracksDataException("ResultSet locations uninitialized when it is expected to be initialized.");
-        }
-
-        boolean ordered = resultSetMetaData.getOrderedResult();
-        DatasetDirectoryRecord[] records = resultSetMetaData.getRecords();
-        /* If ordering is required, we should expose the dataset directory records only in the order, otherwise
-         * we can simply check if there are any newly discovered records and send the whole array back if there are.
-         */
-        if (ordered) {
-            // Iterate over the known records and find the last record which is not null.
-            int i = 0;
-            for (i = 0; i < records.length; i++) {
-                if (knownRecords == null) {
-                    if (records[0] != null) {
-                        knownRecords = new DatasetDirectoryRecord[records.length];
-                        knownRecords[0] = records[0];
-                        return knownRecords;
-                    }
-                    return null;
-                }
-                if (knownRecords[i] == null) {
-                    if ((i == 0 || knownRecords[i - 1].hasReachedReadEOS()) && records[i] != null) {
-                        knownRecords[i] = records[i];
-                        return knownRecords;
-                    }
-                    return null;
-                }
-            }
-        } else {
-            if (!Arrays.equals(records, knownRecords)) {
-                return records;
-            }
-        }
-        return null;
-    }
-
-    private class ResultSetMetaData {
-        private final boolean ordered;
-
-        private final DatasetDirectoryRecord[] records;
-
-        public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) {
-            this.ordered = ordered;
-            this.records = records;
-        }
-
-        public boolean getOrderedResult() {
-            return ordered;
-        }
-
-        public DatasetDirectoryRecord[] getRecords() {
-            return records;
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
deleted file mode 100644
index 3ac6acc..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetDatasetDirectoryServiceInfoWork.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetDatasetDirectoryServiceInfoWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
-
-    private final IResultCallback<NetworkAddress> callback;
-
-    public GetDatasetDirectoryServiceInfoWork(ClusterControllerService ccs, IResultCallback<NetworkAddress> callback) {
-        this.ccs = ccs;
-        this.callback = callback;
-    }
-
-    @Override
-    public void doRun() {
-        try {
-            NetworkAddress addr = ccs.getDatasetDirectoryServiceInfo();
-            callback.setValue(addr);
-        } catch (Exception e) {
-            callback.setException(e);
-        }
-    }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index a787b9f..2f23a2c 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -39,8 +39,7 @@
         Map<String, NodeControllerInfo> result = new LinkedHashMap<String, NodeControllerInfo>();
         Map<String, NodeControllerState> nodeMap = ccs.getNodeMap();
         for (Map.Entry<String, NodeControllerState> e : nodeMap.entrySet()) {
-            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort(), e
-                    .getValue().getDatasetPort()));
+            result.put(e.getKey(), new NodeControllerInfo(e.getKey(), NodeStatus.ALIVE, e.getValue().getDataPort()));
         }
         callback.setValue(result);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
deleted file mode 100644
index fd1d418..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord;
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetResultPartitionLocationsWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
-
-    private final JobId jobId;
-
-    private final ResultSetId rsId;
-
-    private final DatasetDirectoryRecord[] knownRecords;
-
-    private final IResultCallback<DatasetDirectoryRecord[]> callback;
-
-    public GetResultPartitionLocationsWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-        this.rsId = rsId;
-        this.knownRecords = knownRecords;
-        this.callback = callback;
-    }
-
-    @Override
-    public void doRun() {
-        final IDatasetDirectoryService dds = ccs.getDatasetDirectoryService();
-        ccs.getExecutor().execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    DatasetDirectoryRecord[] partitionLocations = dds.getResultPartitionLocations(jobId, rsId,
-                            knownRecords);
-                    callback.setValue(partitionLocations);
-                } catch (HyracksDataException e) {
-                    callback.setException(e);
-                }
-            }
-        });
-    }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
deleted file mode 100644
index d2dadf5..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/GetResultStatusWork.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.DatasetDirectoryRecord.Status;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.IResultCallback;
-import edu.uci.ics.hyracks.control.common.work.SynchronizableWork;
-
-public class GetResultStatusWork extends SynchronizableWork {
-    private final ClusterControllerService ccs;
-
-    private final JobId jobId;
-
-    private final ResultSetId rsId;
-
-    private final IResultCallback<Status> callback;
-
-    public GetResultStatusWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            IResultCallback<Status> callback) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-        this.rsId = rsId;
-        this.callback = callback;
-    }
-
-    @Override
-    public void doRun() {
-        try {
-            Status status = ccs.getDatasetDirectoryService().getResultStatus(jobId, rsId);
-            callback.setValue(status);
-        } catch (HyracksDataException e) {
-            callback.setException(e);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "JobId@" + jobId + " ResultSetId@" + rsId;
-    }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
index b062d33..b6a33cd 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/JobStartWork.java
@@ -16,7 +16,6 @@
 
 import java.util.EnumSet;
 
-import edu.uci.ics.hyracks.api.dataset.IDatasetDirectoryService;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGenerator;
 import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory;
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
deleted file mode 100644
index f86e924..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class RegisterResultPartitionLocationWork extends AbstractWork {
-    private final ClusterControllerService ccs;
-
-    private final JobId jobId;
-
-    private final ResultSetId rsId;
-
-    private final boolean orderedResult;
-
-    private final int partition;
-
-    private final int nPartitions;
-
-    private final NetworkAddress networkAddress;
-
-    public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            boolean orderedResult, int partition, int nPartitions, NetworkAddress networkAddress) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-        this.rsId = rsId;
-        this.orderedResult = orderedResult;
-        this.partition = partition;
-        this.nPartitions = nPartitions;
-        this.networkAddress = networkAddress;
-    }
-
-    @Override
-    public void run() {
-        ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
-                nPartitions, networkAddress);
-    }
-
-    @Override
-    public String toString() {
-        return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition + " NPartitions@" + nPartitions
-                + " ResultPartitionLocation@" + networkAddress + " OrderedResult@" + orderedResult;
-    }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
deleted file mode 100644
index 4aea41e..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionFailureWork.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionFailureWork extends AbstractWork {
-    private final ClusterControllerService ccs;
-
-    private final JobId jobId;
-
-    private final ResultSetId rsId;
-
-    private final int partition;
-
-    public ReportResultPartitionFailureWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, int partition) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-        this.rsId = rsId;
-        this.partition = partition;
-    }
-
-    @Override
-    public void run() {
-        ccs.getDatasetDirectoryService().reportResultPartitionFailure(jobId, rsId, partition);
-    }
-
-    @Override
-    public String toString() {
-        return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
-    }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
deleted file mode 100644
index 313b730..0000000
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.cc.work;
-
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.cc.ClusterControllerService;
-import edu.uci.ics.hyracks.control.common.work.AbstractWork;
-
-public class ReportResultPartitionWriteCompletionWork extends AbstractWork {
-    private final ClusterControllerService ccs;
-
-    private final JobId jobId;
-
-    private final ResultSetId rsId;
-
-    private final int partition;
-
-    public ReportResultPartitionWriteCompletionWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId,
-            int partition) {
-        this.ccs = ccs;
-        this.jobId = jobId;
-        this.rsId = rsId;
-        this.partition = partition;
-    }
-
-    @Override
-    public void run() {
-        ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition);
-    }
-
-    @Override
-    public String toString() {
-        return "JobId@" + jobId + " ResultSetId@" + rsId + " Partition@" + partition;
-    }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
index 3fc46ff..ff9d8a0 100644
--- a/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
+++ b/hyracks/hyracks-control/hyracks-control-cc/src/main/resources/static/javascript/adminconsole/NodeDetailsPage.js
@@ -58,10 +58,6 @@
         var netPayloadBytesWritten = result['net-payload-bytes-written'];
         var netSignalingBytesRead = result['net-signaling-bytes-read'];
         var netSignalingBytesWritten = result['net-signaling-bytes-written'];
-        var datasetNetPayloadBytesRead = result['dataset-net-payload-bytes-read'];
-        var datasetNetPayloadBytesWritten = result['dataset-net-payload-bytes-written'];
-        var datasetNetSignalingBytesRead = result['dataset-net-signaling-bytes-read'];
-        var datasetNetSignalingBytesWritten = result['dataset-net-signaling-bytes-written'];
         var ipcMessagesSent = result['ipc-messages-sent'];
         var ipcMessageBytesSent = result['ipc-message-bytes-sent'];
         var ipcMessagesReceived = result['ipc-messages-received'];
@@ -121,13 +117,9 @@
             }
             if (i < sysLoad.length - 1) {
                 netPayloadReadBWArray.push([ i, computeRate(netPayloadBytesRead, rrdPtr) ]);
-                netPayloadReadBWArray.push([ i, computeRate(datasetNetPayloadBytesRead, rrdPtr) ]);
                 netPayloadWriteBWArray.push([ i, computeRate(netPayloadBytesWritten, rrdPtr) ]);
-                netPayloadWriteBWArray.push([ i, computeRate(datasetNetPayloadBytesWritten, rrdPtr) ]);
                 netSignalingReadBWArray.push([ i, computeRate(netSignalingBytesRead, rrdPtr) ]);
-                netSignalingReadBWArray.push([ i, computeRate(datasetNetSignalingBytesRead, rrdPtr) ]);
                 netSignalingWriteBWArray.push([ i, computeRate(netSignalingBytesWritten, rrdPtr) ]);
-                netSignalingWriteBWArray.push([ i, computeRate(etSignalingBytesWritten, rrdPtr) ]);
                 ipcMessageSendRateArray.push([ i, computeRate(ipcMessagesSent, rrdPtr) ]);
                 ipcMessageBytesSendRateArray.push([ i, computeRate(ipcMessageBytesSent, rrdPtr) ]);
                 ipcMessageReceiveRateArray.push([ i, computeRate(ipcMessagesReceived, rrdPtr) ]);
diff --git a/hyracks/hyracks-control/hyracks-control-common/pom.xml b/hyracks/hyracks-control/hyracks-control-common/pom.xml
index 2efdd42..096f2e1 100644
--- a/hyracks/hyracks-control/hyracks-control-common/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-common/pom.xml
@@ -18,8 +18,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
     </plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
index 55e4479..0c5bb2f 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/IClusterController.java
@@ -16,9 +16,7 @@
 
 import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.controllers.NodeRegistration;
@@ -48,13 +46,6 @@
 
     public void registerPartitionRequest(PartitionRequest partitionRequest) throws Exception;
 
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
-            int nPartitions, NetworkAddress networkAddress) throws Exception;
-
-    public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception;
-
-    public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception;
-
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception;
 
     public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
index 9b34810..167eb4b 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NCConfig.java
@@ -37,9 +37,6 @@
     @Option(name = "-data-ip-address", usage = "IP Address to bind data listener", required = true)
     public String dataIPAddress;
 
-    @Option(name = "-result-ip-address", usage = "IP Address to bind dataset result distribution listener", required = true)
-    public String datasetIPAddress;
-
     @Option(name = "-iodevices", usage = "Comma separated list of IO Device mount points (default: One device in default temp folder)", required = false)
     public String ioDevices = System.getProperty("java.io.tmpdir");
 
@@ -69,7 +66,6 @@
         cList.add(nodeId);
         cList.add("-data-ip-address");
         cList.add(dataIPAddress);
-        cList.add(datasetIPAddress);
         cList.add("-iodevices");
         cList.add(ioDevices);
         cList.add("-dcache-client-servers");
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
index a897602..91cfecf 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/controllers/NodeRegistration.java
@@ -33,8 +33,6 @@
 
     private final NetworkAddress dataPort;
 
-    private final NetworkAddress datasetPort;
-
     private final String osName;
 
     private final String arch;
@@ -62,14 +60,13 @@
     private final HeartbeatSchema hbSchema;
 
     public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort,
-            NetworkAddress datasetPort, String osName, String arch, String osVersion, int nProcessors, String vmName,
-            String vmVersion, String vmVendor, String classpath, String libraryPath, String bootClasspath,
-            List<String> inputArguments, Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
+            String osName, String arch, String osVersion, int nProcessors, String vmName, String vmVersion,
+            String vmVendor, String classpath, String libraryPath, String bootClasspath, List<String> inputArguments,
+            Map<String, String> systemProperties, HeartbeatSchema hbSchema) {
         this.ncAddress = ncAddress;
         this.nodeId = nodeId;
         this.ncConfig = ncConfig;
         this.dataPort = dataPort;
-        this.datasetPort = datasetPort;
         this.osName = osName;
         this.arch = arch;
         this.osVersion = osVersion;
@@ -101,10 +98,6 @@
         return dataPort;
     }
 
-    public NetworkAddress getDatasetPort() {
-        return datasetPort;
-    }
-
     public String getOSName() {
         return osName;
     }
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
index 663c68a..1dba3bc 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/heartbeat/HeartbeatData.java
@@ -37,10 +37,6 @@
     public long netPayloadBytesWritten;
     public long netSignalingBytesRead;
     public long netSignalingBytesWritten;
-    public long datasetNetPayloadBytesRead;
-    public long datasetNetPayloadBytesWritten;
-    public long datasetNetSignalingBytesRead;
-    public long datasetNetSignalingBytesWritten;
     public long ipcMessagesSent;
     public long ipcMessageBytesSent;
     public long ipcMessagesReceived;
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
index b506b12..557a8cb 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/CCNCFunctions.java
@@ -36,7 +36,6 @@
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
 import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobFlag;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.job.JobStatus;
@@ -69,9 +68,6 @@
         REPORT_PROFILE,
         REGISTER_PARTITION_PROVIDER,
         REGISTER_PARTITION_REQUEST,
-        REGISTER_RESULT_PARTITION_LOCATION,
-        REPORT_RESULT_PARTITION_WRITE_COMPLETION,
-        REPORT_RESULT_PARTITION_FAILURE,
         APPLICATION_STATE_CHANGE_RESPONSE,
 
         NODE_REGISTRATION_RESULT,
@@ -442,127 +438,6 @@
         }
     }
 
-    public static class RegisterResultPartitionLocationFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-
-        private final ResultSetId rsId;
-
-        private final boolean orderedResult;
-
-        private final int partition;
-
-        private final int nPartitions;
-
-        private NetworkAddress networkAddress;
-
-        public RegisterResultPartitionLocationFunction(JobId jobId, ResultSetId rsId, boolean orderedResult,
-                int partition, int nPartitions, NetworkAddress networkAddress) {
-            this.jobId = jobId;
-            this.rsId = rsId;
-            this.orderedResult = orderedResult;
-            this.partition = partition;
-            this.nPartitions = nPartitions;
-            this.networkAddress = networkAddress;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REGISTER_RESULT_PARTITION_LOCATION;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public ResultSetId getResultSetId() {
-            return rsId;
-        }
-
-        public boolean getOrderedResult() {
-            return orderedResult;
-        }
-
-        public int getPartition() {
-            return partition;
-        }
-
-        public int getNPartitions() {
-            return nPartitions;
-        }
-
-        public NetworkAddress getNetworkAddress() {
-            return networkAddress;
-        }
-    }
-
-    public static class ReportResultPartitionWriteCompletionFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-
-        private final ResultSetId rsId;
-
-        private final int partition;
-
-        public ReportResultPartitionWriteCompletionFunction(JobId jobId, ResultSetId rsId, int partition) {
-            this.jobId = jobId;
-            this.rsId = rsId;
-            this.partition = partition;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REPORT_RESULT_PARTITION_WRITE_COMPLETION;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public ResultSetId getResultSetId() {
-            return rsId;
-        }
-
-        public int getPartition() {
-            return partition;
-        }
-    }
-
-    public static class ReportResultPartitionFailureFunction extends Function {
-        private static final long serialVersionUID = 1L;
-
-        private final JobId jobId;
-
-        private final ResultSetId rsId;
-
-        private final int partition;
-
-        public ReportResultPartitionFailureFunction(JobId jobId, ResultSetId rsId, int partition) {
-            this.jobId = jobId;
-            this.rsId = rsId;
-            this.partition = partition;
-        }
-
-        @Override
-        public FunctionId getFunctionId() {
-            return FunctionId.REPORT_RESULT_PARTITION_FAILURE;
-        }
-
-        public JobId getJobId() {
-            return jobId;
-        }
-
-        public ResultSetId getResultSetId() {
-            return rsId;
-        }
-
-        public int getPartition() {
-            return partition;
-        }
-    }
-
     public static class ApplicationStateChangeResponseFunction extends Function {
         private static final long serialVersionUID = 1L;
 
diff --git a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
index 091a5d2..bbaab4e 100644
--- a/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
+++ b/hyracks/hyracks-control/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java
@@ -16,9 +16,7 @@
 
 import java.util.List;
 
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.application.ApplicationStatus;
 import edu.uci.ics.hyracks.control.common.base.IClusterController;
@@ -97,28 +95,6 @@
     }
 
     @Override
-    public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, int partition,
-            int nPartitions, NetworkAddress networkAddress) throws Exception {
-        CCNCFunctions.RegisterResultPartitionLocationFunction fn = new CCNCFunctions.RegisterResultPartitionLocationFunction(
-                jobId, rsId, orderedResult, partition, nPartitions, networkAddress);
-        ipcHandle.send(-1, fn, null);
-    }
-
-    @Override
-    public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws Exception {
-        CCNCFunctions.ReportResultPartitionWriteCompletionFunction fn = new CCNCFunctions.ReportResultPartitionWriteCompletionFunction(
-                jobId, rsId, partition);
-        ipcHandle.send(-1, fn, null);
-    }
-
-    @Override
-    public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws Exception {
-        CCNCFunctions.ReportResultPartitionFailureFunction fn = new CCNCFunctions.ReportResultPartitionFailureFunction(
-                jobId, rsId, partition);
-        ipcHandle.send(-1, fn, null);
-    }
-
-    @Override
     public void notifyApplicationStateChange(String nodeId, String appName, ApplicationStatus status) throws Exception {
         CCNCFunctions.ApplicationStateChangeResponseFunction fn = new CCNCFunctions.ApplicationStateChangeResponseFunction(
                 nodeId, appName, status);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index f103f4a..9ecc083 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -15,8 +15,8 @@
         <artifactId>maven-compiler-plugin</artifactId>
         <version>2.0.2</version>
         <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
+          <source>1.6</source>
+          <target>1.6</target>
         </configuration>
       </plugin>
     </plugins>
@@ -40,11 +40,6 @@
   		<artifactId>hyracks-net</artifactId>
   		<version>0.2.3-SNAPSHOT</version>
   	</dependency>
-  	<dependency>
-  		<groupId>edu.uci.ics.hyracks</groupId>
-  		<artifactId>hyracks-comm</artifactId>
-  		<version>0.2.3-SNAPSHOT</version>
-  	</dependency>
   </dependencies>
   <reporting>
     <plugins>
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 290c59c..0195143 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -45,7 +45,6 @@
 
 import edu.uci.ics.hyracks.api.client.NodeControllerInfo;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.io.IODeviceHandle;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.control.common.AbstractRemoteService;
@@ -62,9 +61,7 @@
 import edu.uci.ics.hyracks.control.common.work.FutureValue;
 import edu.uci.ics.hyracks.control.common.work.WorkQueue;
 import edu.uci.ics.hyracks.control.nc.application.NCApplicationContext;
-import edu.uci.ics.hyracks.control.nc.dataset.DatasetPartitionManager;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.net.DatasetNetworkManager;
 import edu.uci.ics.hyracks.control.nc.net.NetworkManager;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
@@ -97,10 +94,6 @@
 
     private final NetworkManager netManager;
 
-    private final IDatasetPartitionManager datasetPartitionManager;
-
-    private final DatasetNetworkManager datasetNetworkManager;
-
     private final WorkQueue queue;
 
     private final Timer timer;
@@ -147,11 +140,7 @@
             throw new Exception("id not set");
         }
         partitionManager = new PartitionManager(this);
-        netManager = new NetworkManager(getIpAddress(ncConfig.dataIPAddress), partitionManager, ncConfig.nNetThreads);
-
-        datasetPartitionManager = new DatasetPartitionManager(this, executor);
-        datasetNetworkManager = new DatasetNetworkManager(getIpAddress(ncConfig.datasetIPAddress),
-                datasetPartitionManager, ncConfig.nNetThreads);
+        netManager = new NetworkManager(getIpAddress(ncConfig), partitionManager, ncConfig.nNetThreads);
 
         queue = new WorkQueue();
         jobletMap = new Hashtable<JobId, Joblet>();
@@ -216,7 +205,6 @@
         LOGGER.log(Level.INFO, "Starting NodeControllerService");
         ipc.start();
         netManager.start();
-        datasetNetworkManager.start();
         IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort));
         this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
         HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
@@ -225,11 +213,10 @@
         }
         HeartbeatSchema hbSchema = new HeartbeatSchema(gcInfos);
         ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netManager.getNetworkAddress(),
-                datasetNetworkManager.getNetworkAddress(), osMXBean.getName(), osMXBean.getArch(), osMXBean
-                        .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
-                        .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
-                        .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
-                runtimeMXBean.getSystemProperties(), hbSchema));
+                osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+                runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean
+                        .getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+                runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
 
         synchronized (this) {
             while (registrationPending) {
@@ -260,10 +247,8 @@
         LOGGER.log(Level.INFO, "Stopping NodeControllerService");
         executor.shutdownNow();
         partitionManager.close();
-        datasetPartitionManager.close();
         heartbeatTask.cancel();
         netManager.stop();
-        datasetNetworkManager.stop();
         queue.stop();
         LOGGER.log(Level.INFO, "Stopped NodeControllerService");
     }
@@ -288,10 +273,6 @@
         return netManager;
     }
 
-    public DatasetNetworkManager getDatasetNetworkManager() {
-        return datasetNetworkManager;
-    }
-
     public PartitionManager getPartitionManager() {
         return partitionManager;
     }
@@ -316,7 +297,8 @@
         return queue;
     }
 
-    private static InetAddress getIpAddress(String ipaddrStr) throws Exception {
+    private static InetAddress getIpAddress(NCConfig ncConfig) throws Exception {
+        String ipaddrStr = ncConfig.dataIPAddress;
         ipaddrStr = ipaddrStr.trim();
         Pattern pattern = Pattern.compile("(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})");
         Matcher m = pattern.matcher(ipaddrStr);
@@ -373,12 +355,6 @@
             hbData.netSignalingBytesRead = netPC.getSignalingBytesRead();
             hbData.netSignalingBytesWritten = netPC.getSignalingBytesWritten();
 
-            MuxDemuxPerformanceCounters datasetNetPC = datasetNetworkManager.getPerformanceCounters();
-            hbData.datasetNetPayloadBytesRead = datasetNetPC.getPayloadBytesRead();
-            hbData.datasetNetPayloadBytesWritten = datasetNetPC.getPayloadBytesWritten();
-            hbData.datasetNetSignalingBytesRead = datasetNetPC.getSignalingBytesRead();
-            hbData.datasetNetSignalingBytesWritten = datasetNetPC.getSignalingBytesWritten();
-
             IPCPerformanceCounters ipcPC = ipc.getPerformanceCounters();
             hbData.ipcMessagesSent = ipcPC.getMessageSentCount();
             hbData.ipcMessageBytesSent = ipcPC.getMessageBytesSent();
@@ -483,10 +459,6 @@
         }
     }
 
-    public IDatasetPartitionManager getDatasetPartitionManager() {
-        return datasetPartitionManager;
-    }
-
     public void sendApplicationMessageToCC(byte[] data, String appName, String nodeId) throws Exception {
         ccs.sendApplicationMessageToCC(data, appName, nodeId);
     }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
index 5a3e9dd..eba3ec9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/Task.java
@@ -34,7 +34,6 @@
 import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.io.FileReference;
@@ -349,11 +348,6 @@
     }
 
     @Override
-    public IDatasetPartitionManager getDatasetPartitionManager() {
-        return ncs.getDatasetPartitionManager();
-    }
-
-    @Override
     public void sendApplicationMessageToCC(byte[] message, String nodeId) throws Exception {
         this.ncs.sendApplicationMessageToCC(message, this.getJobletContext().getApplicationContext()
                 .getApplicationName(), nodeId);
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
deleted file mode 100644
index 85f17b1..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.dataset;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.control.nc.NodeControllerService;
-import edu.uci.ics.hyracks.control.nc.io.IOManager;
-import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
-import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
-
-public class DatasetPartitionManager implements IDatasetPartitionManager {
-    private final NodeControllerService ncs;
-
-    private final Executor executor;
-
-    private final Map<JobId, DatasetPartitionWriter[]> partitionDatasetWriterMap;
-
-    private final DefaultDeallocatableRegistry deallocatableRegistry;
-
-    private final IWorkspaceFileFactory fileFactory;
-
-    public DatasetPartitionManager(NodeControllerService ncs, Executor executor) {
-        this.ncs = ncs;
-        this.executor = executor;
-        partitionDatasetWriterMap = new HashMap<JobId, DatasetPartitionWriter[]>();
-        deallocatableRegistry = new DefaultDeallocatableRegistry();
-        fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
-    }
-
-    @Override
-    public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, ResultSetId rsId, boolean orderedResult,
-            int partition, int nPartitions) throws HyracksException {
-        DatasetPartitionWriter dpw = null;
-        JobId jobId = ctx.getJobletContext().getJobId();
-        try {
-            ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
-                    nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
-            dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, executor);
-
-            DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
-            if (writers == null) {
-                writers = new DatasetPartitionWriter[nPartitions];
-                partitionDatasetWriterMap.put(jobId, writers);
-            }
-            writers[partition] = dpw;
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-
-        return dpw;
-    }
-
-    @Override
-    public void reportPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
-        try {
-            ncs.getClusterController().reportResultPartitionWriteCompletion(jobId, rsId, partition);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    @Override
-    public void reportPartitionFailure(JobId jobId, ResultSetId rsId, int partition) throws HyracksException {
-        try {
-            ncs.getClusterController().reportResultPartitionFailure(jobId, rsId, partition);
-        } catch (Exception e) {
-            throw new HyracksException(e);
-        }
-    }
-
-    @Override
-    public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
-            throws HyracksException {
-        DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
-        if (writers == null) {
-            throw new HyracksException("Unknown JobId " + jobId);
-        }
-
-        DatasetPartitionWriter dpw = writers[partition];
-        if (dpw == null) {
-            throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
-        }
-
-        dpw.writeTo(writer);
-    }
-
-    @Override
-    public IWorkspaceFileFactory getFileFactory() {
-        return fileFactory;
-    }
-
-    @Override
-    public void close() {
-        deallocatableRegistry.close();
-    }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
deleted file mode 100644
index 0f5895e..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.dataset;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.ResultSetId;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.api.partitions.IPartition;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-
-public class DatasetPartitionWriter implements IFrameWriter, IPartition {
-    private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
-
-    private static final String FILE_PREFIX = "result_";
-
-    private final IHyracksTaskContext ctx;
-
-    private final IDatasetPartitionManager manager;
-
-    private final JobId jobId;
-
-    private final ResultSetId resultSetId;
-
-    private final int partition;
-
-    private final Executor executor;
-
-    private final AtomicBoolean eos;
-
-    private FileReference fRef;
-
-    private IFileHandle handle;
-
-    private long size;
-
-    public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
-            ResultSetId rsId, int partition, Executor executor) {
-        this.ctx = ctx;
-        this.manager = manager;
-        this.jobId = jobId;
-        this.resultSetId = rsId;
-        this.partition = partition;
-        this.executor = executor;
-        eos = new AtomicBoolean(false);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("open(" + partition + ")");
-        }
-        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(FILE_PREFIX + String.valueOf(partition));
-        handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
-                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-        size = 0;
-    }
-
-    @Override
-    public synchronized void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        size += ctx.getIOManager().syncWrite(handle, size, buffer);
-        notifyAll();
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        try {
-            manager.reportPartitionFailure(jobId, resultSetId, partition);
-        } catch (HyracksException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public synchronized void close() throws HyracksDataException {
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("close(" + partition + ")");
-        }
-
-        try {
-            eos.set(true);
-            notifyAll();
-            manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
-        } catch (HyracksException e) {
-            throw new HyracksDataException(e);
-        }
-    }
-
-    @Override
-    public IHyracksTaskContext getTaskContext() {
-        return ctx;
-    }
-
-    private synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
-        while (offset >= size && !eos.get()) {
-            try {
-                wait();
-            } catch (InterruptedException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-        return ctx.getIOManager().syncRead(handle, offset, buffer);
-    }
-
-    @Override
-    public void writeTo(final IFrameWriter writer) {
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
-                NetworkOutputChannel channel = (NetworkOutputChannel) writer;
-                channel.setTaskContext(ctx);
-                try {
-                    channel.open();
-                    try {
-                        long offset = 0;
-                        ByteBuffer buffer = ctx.allocateFrame();
-                        while (true) {
-                            buffer.clear();
-                            long size = read(offset, buffer);
-                            if (size < 0) {
-                                break;
-                            } else if (size < buffer.capacity()) {
-                                throw new HyracksDataException("Premature end of file");
-                            }
-                            offset += size;
-                            buffer.flip();
-                            channel.nextFrame(buffer);
-                        }
-                    } finally {
-                        channel.close();
-                        ctx.getIOManager().close(handle);
-                    }
-                } catch (HyracksDataException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        });
-    }
-
-    @Override
-    public boolean isReusable() {
-        return true;
-    }
-
-    @Override
-    public void deallocate() {
-
-    }
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
deleted file mode 100644
index 5b8b333..0000000
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/DatasetNetworkManager.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.control.nc.net;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import edu.uci.ics.hyracks.api.comm.NetworkAddress;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.job.JobId;
-import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
-import edu.uci.ics.hyracks.net.exceptions.NetException;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
-import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
-
-public class DatasetNetworkManager implements IChannelConnectionFactory {
-    private static final Logger LOGGER = Logger.getLogger(DatasetNetworkManager.class.getName());
-
-    private static final int MAX_CONNECTION_ATTEMPTS = 5;
-
-    static final int INITIAL_MESSAGE_SIZE = 20;
-
-    private final IDatasetPartitionManager partitionManager;
-
-    private final MuxDemux md;
-
-    private NetworkAddress networkAddress;
-
-    public DatasetNetworkManager(InetAddress inetAddress, IDatasetPartitionManager partitionManager, int nThreads)
-            throws IOException {
-        this.partitionManager = partitionManager;
-        md = new MuxDemux(new InetSocketAddress(inetAddress, 0), new ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS);
-    }
-
-    public void start() throws IOException {
-        md.start();
-        InetSocketAddress sockAddr = md.getLocalAddress();
-        networkAddress = new NetworkAddress(sockAddr.getAddress().getAddress(), sockAddr.getPort());
-    }
-
-    public NetworkAddress getNetworkAddress() {
-        return networkAddress;
-    }
-
-    public void stop() {
-
-    }
-
-    public ChannelControlBlock connect(SocketAddress remoteAddress) throws InterruptedException, NetException {
-        MultiplexedConnection mConn = md.connect((InetSocketAddress) remoteAddress);
-        return mConn.openChannel();
-    }
-
-    private class ChannelOpenListener implements IChannelOpenListener {
-        @Override
-        public void channelOpened(ChannelControlBlock channel) {
-            channel.getReadInterface().setFullBufferAcceptor(new InitialBufferAcceptor(channel));
-            channel.getReadInterface().getEmptyBufferAcceptor().accept(ByteBuffer.allocate(INITIAL_MESSAGE_SIZE));
-        }
-    }
-
-    private class InitialBufferAcceptor implements ICloseableBufferAcceptor {
-        private final ChannelControlBlock ccb;
-
-        private NetworkOutputChannel noc;
-
-        public InitialBufferAcceptor(ChannelControlBlock ccb) {
-            this.ccb = ccb;
-        }
-
-        @Override
-        public void accept(ByteBuffer buffer) {
-            JobId jobId = new JobId(buffer.getLong());
-            int partition = buffer.getInt();
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine("Received initial dataset partition read request for JobId: " + jobId + " partition: "
-                        + partition + " on channel: " + ccb);
-            }
-            noc = new NetworkOutputChannel(ccb, 1);
-            try {
-                partitionManager.initializeDatasetPartitionReader(jobId, partition, noc);
-            } catch (HyracksException e) {
-                noc.abort();
-            }
-        }
-
-        @Override
-        public void close() {
-
-        }
-
-        @Override
-        public void error(int ecode) {
-            if (noc != null) {
-                noc.abort();
-            }
-        }
-    }
-
-    public MuxDemuxPerformanceCounters getPerformanceCounters() {
-        return md.getPerformanceCounters();
-    }
-}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
new file mode 100644
index 0000000..1d5af84
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkInputChannel.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.api.channels.IInputChannel;
+import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkInputChannel implements IInputChannel {
+    private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
+
+    private final NetworkManager netManager;
+
+    private final SocketAddress remoteAddress;
+
+    private final PartitionId partitionId;
+
+    private final Queue<ByteBuffer> fullQueue;
+
+    private final int nBuffers;
+
+    private ChannelControlBlock ccb;
+
+    private IInputChannelMonitor monitor;
+
+    private Object attachment;
+
+    public NetworkInputChannel(NetworkManager netManager, SocketAddress remoteAddress, PartitionId partitionId,
+            int nBuffers) {
+        this.netManager = netManager;
+        this.remoteAddress = remoteAddress;
+        this.partitionId = partitionId;
+        fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
+        this.nBuffers = nBuffers;
+    }
+
+    @Override
+    public void registerMonitor(IInputChannelMonitor monitor) {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+    @Override
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    @Override
+    public synchronized ByteBuffer getNextBuffer() {
+        return fullQueue.poll();
+    }
+
+    @Override
+    public void recycleBuffer(ByteBuffer buffer) {
+        buffer.clear();
+        ccb.getReadInterface().getEmptyBufferAcceptor().accept(buffer);
+    }
+
+    @Override
+    public void open(IHyracksTaskContext ctx) throws HyracksDataException {
+        try {
+            ccb = netManager.connect(remoteAddress);
+        } catch (Exception e) {
+            throw new HyracksDataException(e);
+        }
+        ccb.getReadInterface().setFullBufferAcceptor(new ReadFullBufferAcceptor());
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+        for (int i = 0; i < nBuffers; ++i) {
+            ccb.getReadInterface().getEmptyBufferAcceptor().accept(ctx.allocateFrame());
+        }
+        ByteBuffer writeBuffer = ByteBuffer.allocate(NetworkManager.INITIAL_MESSAGE_SIZE);
+        writeBuffer.putLong(partitionId.getJobId().getId());
+        writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
+        writeBuffer.putInt(partitionId.getSenderIndex());
+        writeBuffer.putInt(partitionId.getReceiverIndex());
+        writeBuffer.flip();
+        if (LOGGER.isLoggable(Level.FINE)) {
+            LOGGER.fine("Sending partition request: " + partitionId + " on channel: " + ccb);
+        }
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(writeBuffer);
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+
+    }
+
+    private class ReadFullBufferAcceptor implements ICloseableBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            fullQueue.add(buffer);
+            monitor.notifyDataAvailability(NetworkInputChannel.this, 1);
+        }
+
+        @Override
+        public void close() {
+            monitor.notifyEndOfStream(NetworkInputChannel.this);
+        }
+
+        @Override
+        public void error(int ecode) {
+            monitor.notifyFailure(NetworkInputChannel.this);
+        }
+    }
+
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            // do nothing
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
index c8e4e94..b805595 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkManager.java
@@ -27,8 +27,6 @@
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.comm.channels.IChannelConnectionFactory;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
 import edu.uci.ics.hyracks.net.exceptions.NetException;
@@ -38,7 +36,7 @@
 import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
 import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemuxPerformanceCounters;
 
-public class NetworkManager implements IChannelConnectionFactory {
+public class NetworkManager {
     private static final Logger LOGGER = Logger.getLogger(NetworkManager.class.getName());
 
     private static final int MAX_CONNECTION_ATTEMPTS = 5;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
new file mode 100644
index 0000000..9024e18
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/net/NetworkOutputChannel.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.net;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
+import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
+
+public class NetworkOutputChannel implements IFrameWriter {
+    private final ChannelControlBlock ccb;
+
+    private final int nBuffers;
+
+    private final Deque<ByteBuffer> emptyStack;
+
+    private boolean aborted;
+
+    public NetworkOutputChannel(ChannelControlBlock ccb, int nBuffers) {
+        this.ccb = ccb;
+        this.nBuffers = nBuffers;
+        emptyStack = new ArrayDeque<ByteBuffer>(nBuffers);
+        ccb.getWriteInterface().setEmptyBufferAcceptor(new WriteEmptyBufferAcceptor());
+    }
+
+    public void setTaskContext(IHyracksTaskContext ctx) {
+        for (int i = 0; i < nBuffers; ++i) {
+            emptyStack.push(ByteBuffer.allocateDirect(ctx.getFrameSize()));
+        }
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        ByteBuffer destBuffer = null;
+        synchronized (this) {
+            while (true) {
+                if (aborted) {
+                    throw new HyracksDataException("Connection has been aborted");
+                }
+                destBuffer = emptyStack.poll();
+                if (destBuffer != null) {
+                    break;
+                }
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    throw new HyracksDataException(e);
+                }
+            }
+        }
+        buffer.position(0);
+        buffer.limit(destBuffer.capacity());
+        destBuffer.clear();
+        destBuffer.put(buffer);
+        destBuffer.flip();
+        ccb.getWriteInterface().getFullBufferAcceptor().accept(destBuffer);
+    }
+
+    @Override
+    public void fail() throws HyracksDataException {
+        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        ccb.getWriteInterface().getFullBufferAcceptor().close();
+    }
+
+    void abort() {
+        ccb.getWriteInterface().getFullBufferAcceptor().error(1);
+        synchronized (NetworkOutputChannel.this) {
+            aborted = true;
+            NetworkOutputChannel.this.notifyAll();
+        }
+    }
+
+    private class WriteEmptyBufferAcceptor implements IBufferAcceptor {
+        @Override
+        public void accept(ByteBuffer buffer) {
+            synchronized (NetworkOutputChannel.this) {
+                emptyStack.push(buffer);
+                NetworkOutputChannel.this.notifyAll();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
index ba6e6c3..16e31f7 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionInputChannel.java
@@ -21,7 +21,7 @@
 import edu.uci.ics.hyracks.api.channels.IInputChannel;
 import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
@@ -82,7 +82,7 @@
     }
 
     @Override
-    public void open(IHyracksCommonContext ctx) throws HyracksDataException {
+    public void open(IHyracksTaskContext ctx) throws HyracksDataException {
         for (int i = 0; i < nBuffers; ++i) {
             emptyQueue.add(ctx.allocateFrame());
         }
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index e532a0b..45c091a 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -28,12 +28,12 @@
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
 import edu.uci.ics.hyracks.control.common.job.PartitionDescriptor;
 import edu.uci.ics.hyracks.control.common.job.PartitionState;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.net.NetworkOutputChannel;
 import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
 public class PartitionManager {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
index 7ed9d11..bb9669d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/work/ReportPartitionAvailabilityWork.java
@@ -22,10 +22,10 @@
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
-import edu.uci.ics.hyracks.comm.channels.NetworkInputChannel;
 import edu.uci.ics.hyracks.control.common.work.AbstractWork;
 import edu.uci.ics.hyracks.control.nc.Joblet;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
 
 public class ReportPartitionAvailabilityWork extends AbstractWork {
     private final NodeControllerService ncs;