More refactoring.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 7b61021..b4703b9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -84,7 +84,7 @@
synchronized (this) {
ncs.getClusterController().registerResultPartitionLocation(jobId, rsId, orderedResult, partition,
nPartitions, ncs.getDatasetNetworkManager().getNetworkAddress());
- dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager);
+ dpw = new DatasetPartitionWriter(ctx, this, jobId, rsId, partition, datasetMemoryManager, fileFactory);
ResultState[] resultStates = partitionResultStateMap.get(jobId);
if (resultStates == null) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index c1f8f29..2abbed5 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -24,15 +24,13 @@
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
-import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
public class DatasetPartitionWriter implements IFrameWriter {
private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
- private static final String FILE_PREFIX = "result_";
-
private final IDatasetPartitionManager manager;
private final JobId jobId;
@@ -48,7 +46,8 @@
private final ResultState resultState;
public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
- ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
+ ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager,
+ IWorkspaceFileFactory fileFactory) {
this.manager = manager;
this.jobId = jobId;
this.resultSetId = rsId;
@@ -56,7 +55,7 @@
this.datasetMemoryManager = datasetMemoryManager;
resultSetPartitionId = new ResultSetPartitionId(jobId, rsId, partition);
- resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), ctx.getFrameSize());
+ resultState = new ResultState(resultSetPartitionId, ctx.getIOManager(), fileFactory, ctx.getFrameSize());
}
public ResultState getResultState() {
@@ -64,13 +63,11 @@
}
@Override
- public void open() throws HyracksDataException {
+ public void open() {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("open(" + partition + ")");
}
- String fName = FILE_PREFIX + String.valueOf(partition);
- FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
- resultState.open(fRef);
+ resultState.open();
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 70d9714..a91db17 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -28,16 +28,21 @@
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
public class ResultState implements IStateObject {
+ private static final String FILE_PREFIX = "result_";
+
private final ResultSetPartitionId resultSetPartitionId;
private final int frameSize;
private final IIOManager ioManager;
+ private final IWorkspaceFileFactory fileFactory;
+
private final AtomicBoolean eos;
private final List<Page> localPageList;
@@ -52,22 +57,22 @@
private long persistentSize;
- ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, int frameSize) {
+ ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, IWorkspaceFileFactory fileFactory,
+ int frameSize) {
this.resultSetPartitionId = resultSetPartitionId;
this.ioManager = ioManager;
+ this.fileFactory = fileFactory;
this.frameSize = frameSize;
eos = new AtomicBoolean(false);
localPageList = new ArrayList<Page>();
+ fileRef = null;
writeFileHandle = null;
}
- public synchronized void open(FileReference fileRef) throws HyracksDataException {
- this.fileRef = fileRef;
-
+ public synchronized void open() {
size = 0;
persistentSize = 0;
- notifyAll();
}
public synchronized void close() {
@@ -169,9 +174,12 @@
page.getBuffer().flip();
- if (writeFileHandle == null) {
+ if (fileRef == null) {
+ String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+ fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ notifyAll();
}
long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());