Update the ResultWriterOperatorDescriptor to work with the new network layer implementation.

git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2516 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 5c8a0c2..31b9af8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -22,8 +22,8 @@
 import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
 import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 
@@ -35,9 +35,8 @@
     }
 
     @Override
-    public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
-        final JobId jobId = ctx.getJobletContext().getJobId();
         final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
 
         return new AbstractUnaryInputSinkOperatorNodePushable() {
@@ -45,7 +44,12 @@
 
             @Override
             public void open() throws HyracksDataException {
-                datasetPartitionWriter = dpm.createDatasetPartitionWriter(jobId, partition, nPartitions);
+                try {
+                    datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, partition, nPartitions);
+                    datasetPartitionWriter.open();
+                } catch (HyracksException e) {
+                    throw new HyracksDataException(e);
+                }
             }
 
             @Override