Update the ResultWriterOperatorDescriptor to work with the new network layer implementation.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2516 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
index 5c8a0c2..31b9af8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/result/ResultWriterOperatorDescriptor.java
@@ -22,8 +22,8 @@
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
-import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
@@ -35,9 +35,8 @@
}
@Override
- public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions) {
- final JobId jobId = ctx.getJobletContext().getJobId();
final IDatasetPartitionManager dpm = ctx.getDatasetPartitionManager();
return new AbstractUnaryInputSinkOperatorNodePushable() {
@@ -45,7 +44,12 @@
@Override
public void open() throws HyracksDataException {
- datasetPartitionWriter = dpm.createDatasetPartitionWriter(jobId, partition, nPartitions);
+ try {
+ datasetPartitionWriter = dpm.createDatasetPartitionWriter(ctx, partition, nPartitions);
+ datasetPartitionWriter.open();
+ } catch (HyracksException e) {
+ throw new HyracksDataException(e);
+ }
}
@Override