Added connector policy support to runtime

git-svn-id: https://hyracks.googlecode.com/svn/branches/hyracks_scheduling@399 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
index 0c515f4..bf4dde1 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/ActivityCluster.java
@@ -20,8 +20,10 @@
 import java.util.Set;
 
 import edu.uci.ics.hyracks.api.dataflow.ActivityNodeId;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.control.cc.scheduler.IActivityClusterStateMachine;
+import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
 
 public class ActivityCluster {
     private final JobRun jobRun;
@@ -38,6 +40,8 @@
 
     private IActivityClusterStateMachine acsm;
 
+    private Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies;
+
     public ActivityCluster(JobRun jobRun, Set<ActivityNodeId> activities) {
         this.jobRun = jobRun;
         this.activities = activities;
@@ -62,7 +66,7 @@
         return dependencies;
     }
 
-    public Map<ActivityNodeId, Task[]> getTaskStateMap() {
+    public Map<ActivityNodeId, Task[]> getTaskMap() {
         return taskStateMap;
     }
 
@@ -97,4 +101,12 @@
     public void notifyActivityClusterComplete() throws HyracksException {
         jobRun.getStateMachine().notifyActivityClusterComplete(this);
     }
+
+    public void setConnectorPolicyMap(Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) {
+        this.connectorPolicies = connectorPolicies;
+    }
+
+    public Map<ConnectorDescriptorId, IConnectorPolicy> getConnectorPolicyMap() {
+        return connectorPolicies;
+    }
 }
\ No newline at end of file
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
index a69b144..1fb4bc3 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/job/manager/events/AbstractTaskLifecycleEvent.java
@@ -51,7 +51,7 @@
             Map<ActivityNodeId, ActivityCluster> activityClusterMap = run.getActivityClusterMap();
             ActivityCluster ac = activityClusterMap.get(tid.getActivityId());
             if (ac != null) {
-                Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskStateMap();
+                Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
                 Task[] taskStates = taskStateMap.get(tid.getActivityId());
                 if (taskStates != null && taskStates.length > tid.getPartition()) {
                     Task ts = taskStates[tid.getPartition()];
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
index cdd6a65..c325433 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultActivityClusterStateMachine.java
@@ -30,6 +30,7 @@
 
 import edu.uci.ics.hyracks.api.constraints.expressions.LValueConstraintExpression;
 import edu.uci.ics.hyracks.api.constraints.expressions.PartitionLocationExpression;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.dataflow.TaskId;
@@ -45,6 +46,7 @@
 import edu.uci.ics.hyracks.control.cc.job.TaskCluster;
 import edu.uci.ics.hyracks.control.cc.job.TaskClusterAttempt;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
 
 public class DefaultActivityClusterStateMachine implements IActivityClusterStateMachine {
     private static final Logger LOGGER = Logger.getLogger(DefaultActivityClusterStateMachine.class.getName());
@@ -223,6 +225,7 @@
         final UUID jobId = jobRun.getJobId();
         final JobActivityGraph jag = jobRun.getJobActivityGraph();
         final String appName = jag.getApplicationName();
+        final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = ac.getConnectorPolicyMap();
         for (Map.Entry<String, List<TaskAttemptDescriptor>> e : taskAttemptMap.entrySet()) {
             String nodeId = e.getKey();
             final List<TaskAttemptDescriptor> taskDescriptors = e.getValue();
@@ -234,7 +237,7 @@
                     public void run() {
                         try {
                             node.getNodeController().startTasks(appName, jobId, JavaSerializationUtils.serialize(jag),
-                                    taskDescriptors);
+                                    taskDescriptors, connectorPolicies);
                         } catch (IOException e) {
                             e.printStackTrace();
                         } catch (Exception e) {
diff --git a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
index 2ea10d6..5737743 100644
--- a/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
+++ b/hyracks-control-cc/src/main/java/edu/uci/ics/hyracks/control/cc/scheduler/DefaultJobRunStateMachine.java
@@ -52,6 +52,7 @@
 import edu.uci.ics.hyracks.control.cc.job.manager.events.JobCleanupEvent;
 import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
 import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
+import edu.uci.ics.hyracks.control.common.job.dataflow.SendSideMaterializedConnectorPolicy;
 
 public class DefaultJobRunStateMachine implements IJobRunStateMachine {
     private static final Logger LOGGER = Logger.getLogger(DefaultJobRunStateMachine.class.getName());
@@ -327,7 +328,8 @@
 
     private void buildTaskClusters(ActivityCluster ac) throws HyracksException {
         Map<ActivityNodeId, ActivityPartitionDetails> pcMap = computePartitionCounts(ac);
-        Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskStateMap();
+
+        Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
 
         for (ActivityNodeId anId : ac.getActivities()) {
             ActivityPartitionDetails apd = pcMap.get(anId);
@@ -337,11 +339,11 @@
             }
             taskStateMap.put(anId, taskStates);
         }
-
+        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = assignConnectorPolicy(ac, pcMap);
+        ac.setConnectorPolicyMap(connectorPolicies);
+        
         Set<ActivityNodeId> activities = ac.getActivities();
 
-        Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
-
         Map<TaskId, Set<TaskId>> taskClusterMap = new HashMap<TaskId, Set<TaskId>>();
         for (ActivityNodeId anId : activities) {
             Task[] taskStates = taskStateMap.get(anId);
@@ -364,7 +366,7 @@
                     ConnectorDescriptorId cdId = c.getConnectorId();
                     IConnectorPolicy cPolicy = connectorPolicies.get(cdId);
                     if (cPolicy == null) {
-                        cPolicy = new PipelinedConnectorPolicy();
+                        cPolicy = new SendSideMaterializedConnectorPolicy();
                     }
                     ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
                     Task[] ac2TaskStates = taskStateMap.get(ac2);
@@ -463,6 +465,41 @@
         }
     }
 
+    private Map<ConnectorDescriptorId, IConnectorPolicy> assignConnectorPolicy(ActivityCluster ac,
+            Map<ActivityNodeId, ActivityPartitionDetails> pcMap) {
+        JobActivityGraph jag = jobRun.getJobActivityGraph();
+        Map<ConnectorDescriptorId, IConnectorPolicy> cPolicyMap = new HashMap<ConnectorDescriptorId, IConnectorPolicy>();
+        Set<ActivityNodeId> activities = ac.getActivities();
+        Map<ActivityNodeId, Task[]> taskStateMap = ac.getTaskMap();
+        BitSet targetBitmap = new BitSet();
+        for (ActivityNodeId ac1 : activities) {
+            Task[] ac1TaskStates = taskStateMap.get(ac1);
+            int nProducers = ac1TaskStates.length;
+            List<IConnectorDescriptor> outputConns = jag.getActivityOutputConnectorDescriptors(ac1);
+            if (outputConns != null) {
+                for (IConnectorDescriptor c : outputConns) {
+                    ConnectorDescriptorId cdId = c.getConnectorId();
+                    ActivityNodeId ac2 = jag.getConsumerActivity(cdId);
+                    Task[] ac2TaskStates = taskStateMap.get(ac2);
+                    int nConsumers = ac2TaskStates.length;
+
+                    int[] fanouts = new int[nProducers];
+                    for (int i = 0; i < nProducers; ++i) {
+                        c.indicateTargetPartitions(nProducers, nConsumers, i, targetBitmap);
+                        fanouts[i] = targetBitmap.cardinality();
+                    }
+                    IConnectorPolicy cp = assignConnectorPolicy(c, nProducers, nConsumers, fanouts);
+                    cPolicyMap.put(cdId, cp);
+                }
+            }
+        }
+        return cPolicyMap;
+    }
+
+    private IConnectorPolicy assignConnectorPolicy(IConnectorDescriptor c, int nProducers, int nConsumers, int[] fanouts) {
+        return new PipelinedConnectorPolicy();
+    }
+
     private void computeDependencyClosure(Set<TaskCluster> tcSet) {
         boolean done = false;
         while (!done) {
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
index 4043baa..54b31f8 100644
--- a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/base/INodeController.java
@@ -16,12 +16,15 @@
 
 import java.rmi.Remote;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.TaskAttemptId;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
 
 public interface INodeController extends Remote {
     public String getId() throws Exception;
@@ -30,8 +33,8 @@
 
     public NodeCapability getNodeCapability() throws Exception;
 
-    public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors)
-            throws Exception;
+    public void startTasks(String appName, UUID jobId, byte[] planBytes, List<TaskAttemptDescriptor> taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies) throws Exception;
 
     public void abortTasks(UUID jobId, List<TaskAttemptId> tasks) throws Exception;
 
diff --git a/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
new file mode 100644
index 0000000..235cd3c
--- /dev/null
+++ b/hyracks-control-common/src/main/java/edu/uci/ics/hyracks/control/common/job/dataflow/SendSideMaterializedConnectorPolicy.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.common.job.dataflow;
+
+public final class SendSideMaterializedConnectorPolicy implements IConnectorPolicy {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean requiresProducerConsumerCoscheduling() {
+        return false;
+    }
+
+    @Override
+    public boolean consumerWaitsForProducerToFinish() {
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index 2f28b92..c7636f8 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -52,6 +52,7 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.comm.PartitionChannel;
 import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
 import edu.uci.ics.hyracks.api.dataflow.IActivityNode;
 import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
 import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
@@ -75,6 +76,9 @@
 import edu.uci.ics.hyracks.control.common.base.NodeParameters;
 import edu.uci.ics.hyracks.control.common.context.ServerContext;
 import edu.uci.ics.hyracks.control.common.job.TaskAttemptDescriptor;
+import edu.uci.ics.hyracks.control.common.job.dataflow.IConnectorPolicy;
+import edu.uci.ics.hyracks.control.common.job.dataflow.PipelinedConnectorPolicy;
+import edu.uci.ics.hyracks.control.common.job.dataflow.SendSideMaterializedConnectorPolicy;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.JobletProfile;
 import edu.uci.ics.hyracks.control.common.job.profiling.om.TaskProfile;
@@ -82,6 +86,7 @@
 import edu.uci.ics.hyracks.control.nc.io.IOManager;
 import edu.uci.ics.hyracks.control.nc.net.ConnectionManager;
 import edu.uci.ics.hyracks.control.nc.net.NetworkInputChannel;
+import edu.uci.ics.hyracks.control.nc.partitions.MaterializedPartitionWriter;
 import edu.uci.ics.hyracks.control.nc.partitions.PartitionManager;
 import edu.uci.ics.hyracks.control.nc.partitions.PipelinedPartition;
 import edu.uci.ics.hyracks.control.nc.runtime.RootHyracksContext;
@@ -137,6 +142,10 @@
         applications = new Hashtable<String, NCApplicationContext>();
     }
 
+    public IHyracksRootContext getRootContext() {
+        return ctx;
+    }
+
     private static List<IODeviceHandle> getDevices(String ioDevices) {
         List<IODeviceHandle> devices = new ArrayList<IODeviceHandle>();
         StringTokenizer tok = new StringTokenizer(ioDevices, ",");
@@ -169,6 +178,7 @@
     @Override
     public void stop() throws Exception {
         LOGGER.log(Level.INFO, "Stopping NodeControllerService");
+        partitionManager.close();
         connectionManager.stop();
         LOGGER.log(Level.INFO, "Stopped NodeControllerService");
     }
@@ -218,8 +228,10 @@
         return InetAddress.getByAddress(ipBytes);
     }
 
+    @Override
     public void startTasks(String appName, final UUID jobId, byte[] jagBytes,
-            List<TaskAttemptDescriptor> taskDescriptors) throws Exception {
+            List<TaskAttemptDescriptor> taskDescriptors,
+            Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap) throws Exception {
         try {
             NCApplicationContext appCtx = applications.get(appName);
             final JobActivityGraph plan = (JobActivityGraph) appCtx.deserialize(jagBytes);
@@ -273,13 +285,11 @@
                     for (int i = 0; i < outputs.size(); ++i) {
                         final IConnectorDescriptor conn = outputs.get(i);
                         RecordDescriptor recordDesc = plan.getJobSpecification().getConnectorRecordDescriptor(conn);
-                        IPartitionWriterFactory pwFactory = new IPartitionWriterFactory() {
-                            @Override
-                            public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
-                                return new PipelinedPartition(partitionManager, new PartitionId(jobId,
-                                        conn.getConnectorId(), partition, receiverIndex));
-                            }
-                        };
+                        IConnectorPolicy cPolicy = connectorPoliciesMap.get(conn.getConnectorId());
+
+                        IPartitionWriterFactory pwFactory = createPartitionWriterFactory(cPolicy, jobId, conn,
+                                partition);
+
                         if (LOGGER.isLoggable(Level.INFO)) {
                             LOGGER.info("output: " + i + ": " + conn.getConnectorId());
                         }
@@ -300,6 +310,28 @@
         }
     }
 
+    private IPartitionWriterFactory createPartitionWriterFactory(IConnectorPolicy cPolicy, final UUID jobId,
+            final IConnectorDescriptor conn, final int senderIndex) {
+        if (cPolicy instanceof PipelinedConnectorPolicy) {
+            return new IPartitionWriterFactory() {
+                @Override
+                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                    return new PipelinedPartition(partitionManager, new PartitionId(jobId, conn.getConnectorId(),
+                            senderIndex, receiverIndex));
+                }
+            };
+        } else if (cPolicy instanceof SendSideMaterializedConnectorPolicy) {
+            return new IPartitionWriterFactory() {
+                @Override
+                public IFrameWriter createFrameWriter(int receiverIndex) throws HyracksDataException {
+                    return new MaterializedPartitionWriter(ctx, partitionManager, new PartitionId(jobId,
+                            conn.getConnectorId(), senderIndex, receiverIndex), executor);
+                }
+            };
+        }
+        throw new IllegalArgumentException("Unknown connector policy: " + cPolicy);
+    }
+
     private synchronized Joblet getOrCreateLocalJoblet(UUID jobId, INCApplicationContext appCtx) throws Exception {
         Joblet ji = jobletMap.get(jobId);
         if (ji == null) {
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
index 6833cbe..97f82d9 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartition.java
@@ -56,23 +56,28 @@
                 try {
                     FileHandle fh = ioManager.open(partitionFile, IIOManager.FileReadWriteMode.READ_ONLY,
                             IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
-                    writer.open();
                     try {
-                        long offset = 0;
-                        ByteBuffer buffer = ctx.allocateFrame();
-                        while (true) {
-                            buffer.clear();
-                            long size = ioManager.syncRead(fh, offset, buffer);
-                            if (size < 0) {
-                                break;
-                            } else if (size < buffer.capacity()) {
-                                throw new HyracksDataException("Premature end of file");
+                        writer.open();
+                        try {
+                            long offset = 0;
+                            ByteBuffer buffer = ctx.allocateFrame();
+                            while (true) {
+                                buffer.clear();
+                                long size = ioManager.syncRead(fh, offset, buffer);
+                                if (size < 0) {
+                                    break;
+                                } else if (size < buffer.capacity()) {
+                                    throw new HyracksDataException("Premature end of file");
+                                }
+                                offset += size;
+                                buffer.flip();
+                                writer.nextFrame(buffer);
                             }
-                            buffer.flip();
-                            writer.nextFrame(buffer);
+                        } finally {
+                            writer.close();
                         }
                     } finally {
-                        writer.close();
+                        ioManager.close(fh);
                     }
                 } catch (HyracksDataException e) {
                     throw new RuntimeException(e);
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
new file mode 100644
index 0000000..a74f099
--- /dev/null
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/MaterializedPartitionWriter.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executor;
+
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.io.FileHandle;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.partitions.PartitionId;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+
+public class MaterializedPartitionWriter implements IFrameWriter {
+    protected final IHyracksRootContext ctx;
+
+    protected final PartitionManager manager;
+
+    protected final PartitionId pid;
+
+    protected final Executor executor;
+
+    private FileReference fRef;
+
+    private FileHandle handle;
+
+    private long size;
+
+    public MaterializedPartitionWriter(IHyracksRootContext ctx, PartitionManager manager, PartitionId pid,
+            Executor executor) {
+        this.ctx = ctx;
+        this.manager = manager;
+        this.pid = pid;
+        this.executor = executor;
+    }
+
+    @Override
+    public void open() throws HyracksDataException {
+        fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(pid.toString());
+        handle = ctx.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        size = 0;
+    }
+
+    @Override
+    public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+        size += ctx.getIOManager().syncWrite(handle, size, buffer);
+    }
+
+    @Override
+    public void flush() throws HyracksDataException {
+    }
+
+    @Override
+    public void close() throws HyracksDataException {
+        ctx.getIOManager().close(handle);
+        manager.registerPartition(pid, new MaterializedPartition(ctx, fRef, executor, (IOManager) ctx.getIOManager()));
+    }
+}
\ No newline at end of file
diff --git a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
index 27afb8c..4adb20a 100644
--- a/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
+++ b/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/partitions/PartitionManager.java
@@ -23,9 +23,13 @@
 import edu.uci.ics.hyracks.api.comm.NetworkAddress;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
 import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
 import edu.uci.ics.hyracks.api.partitions.IPartition;
 import edu.uci.ics.hyracks.api.partitions.PartitionId;
 import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
 
 public class PartitionManager implements IPartitionRequestListener {
     private final NetworkAddress dataPort;
@@ -34,10 +38,16 @@
 
     private final Map<PartitionId, IPartition> partitionMap;
 
+    private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+    private final IWorkspaceFileFactory fileFactory;
+
     public PartitionManager(NodeControllerService ncs, NetworkAddress dataPort) {
         this.dataPort = dataPort;
         this.ncs = ncs;
         partitionMap = new HashMap<PartitionId, IPartition>();
+        deallocatableRegistry = new DefaultDeallocatableRegistry();
+        fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
     }
 
     public void registerPartition(PartitionId pid, IPartition partition) throws HyracksDataException {
@@ -79,4 +89,12 @@
             throw new HyracksException("Request for unknown partition " + partitionId);
         }
     }
+
+    public IWorkspaceFileFactory getFileFactory() {
+        return fileFactory;
+    }
+
+    public void close() {
+        deallocatableRegistry.close();
+    }
 }
\ No newline at end of file