Update the implementation of DatasetPartitionManger along with its interface to support the network communication completely.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@2515 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
index adc94b0..9e1067e 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionManager.java
@@ -15,10 +15,19 @@
package edu.uci.ics.hyracks.api.dataset;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
public interface IDatasetPartitionManager {
- public IFrameWriter createDatasetPartitionWriter(JobId jobId, int partition, int nPartitions)
- throws HyracksDataException;
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, int partition, int nPartitions)
+ throws HyracksException;
+
+ public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter noc)
+ throws HyracksException;
+
+ public IWorkspaceFileFactory getFileFactory();
+
+ public void close();
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 6b04dcd..9d9d5fc 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -16,38 +16,84 @@
import java.util.HashMap;
import java.util.Map;
-
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.concurrent.Executor;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.exceptions.HyracksException;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.control.nc.NodeControllerService;
+import edu.uci.ics.hyracks.control.nc.io.IOManager;
+import edu.uci.ics.hyracks.control.nc.io.WorkspaceFileFactory;
+import edu.uci.ics.hyracks.control.nc.resources.DefaultDeallocatableRegistry;
public class DatasetPartitionManager implements IDatasetPartitionManager {
private final NodeControllerService ncs;
- private final Map<Pair<JobId, Integer>, IFrameWriter> partitionDatasetWriterMap;
+ private final Executor executor;
- public DatasetPartitionManager(NodeControllerService ncs) {
+ private final Map<JobId, DatasetPartitionWriter[]> partitionDatasetWriterMap;
+
+ private final DefaultDeallocatableRegistry deallocatableRegistry;
+
+ private final IWorkspaceFileFactory fileFactory;
+
+ public DatasetPartitionManager(NodeControllerService ncs, Executor executor) {
this.ncs = ncs;
- partitionDatasetWriterMap = new HashMap<Pair<JobId, Integer>, IFrameWriter>();
+ this.executor = executor;
+ partitionDatasetWriterMap = new HashMap<JobId, DatasetPartitionWriter[]>();
+ deallocatableRegistry = new DefaultDeallocatableRegistry();
+ fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
}
@Override
- public IFrameWriter createDatasetPartitionWriter(JobId jobId, int partition, int nPartitions)
- throws HyracksDataException {
+ public IFrameWriter createDatasetPartitionWriter(IHyracksTaskContext ctx, int partition, int nPartitions)
+ throws HyracksException {
DatasetPartitionWriter dpw = null;
+ JobId jobId = ctx.getJobletContext().getJobId();
try {
ncs.getClusterController().registerResultPartitionLocation(jobId, partition, nPartitions,
- ncs.getNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter();
- partitionDatasetWriterMap.put(Pair.<JobId, Integer> of(jobId, partition), dpw);
+ ncs.getDatasetNetworkManager().getNetworkAddress());
+ dpw = new DatasetPartitionWriter(ctx, this, partition, executor);
+
+ DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
+ if (writers == null) {
+ writers = new DatasetPartitionWriter[nPartitions];
+ partitionDatasetWriterMap.put(jobId, writers);
+ }
+ writers[partition] = dpw;
} catch (Exception e) {
- throw new HyracksDataException(e);
+ throw new HyracksException(e);
}
return dpw;
}
+
+ @Override
+ public void initializeDatasetPartitionReader(JobId jobId, int partition, IFrameWriter writer)
+ throws HyracksException {
+ DatasetPartitionWriter[] writers = partitionDatasetWriterMap.get(jobId);
+ if (writers == null) {
+ throw new HyracksException("Unknown JobId " + jobId);
+ }
+
+ DatasetPartitionWriter dpw = writers[partition];
+ if (dpw == null) {
+ throw new HyracksException("No DatasetPartitionWriter for partition " + partition);
+ }
+
+ dpw.writeTo(writer);
+ }
+
+ @Override
+ public IWorkspaceFileFactory getFileFactory() {
+ return fileFactory;
+ }
+
+ @Override
+ public void close() {
+ deallocatableRegistry.close();
+ }
}