Don't use memory manager when no buffer space is given.
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index 8fef455..4e27f12 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -85,7 +85,7 @@
updateReference(resultSetPartitionId, null);
}
- public int getPageSize() {
+ public static int getPageSize() {
return FRAME_SIZE;
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index a632120..ef2902e 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -52,7 +52,11 @@
this.executor = executor;
deallocatableRegistry = new DefaultDeallocatableRegistry();
fileFactory = new WorkspaceFileFactory(deallocatableRegistry, (IOManager) ncs.getRootContext().getIOManager());
- datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+ if (availableMemory >= DatasetMemoryManager.getPageSize()) {
+ datasetMemoryManager = new DatasetMemoryManager(availableMemory);
+ } else {
+ datasetMemoryManager = null;
+ }
partitionResultStateMap = new LinkedHashMap<JobId, ResultState[]>() {
private static final long serialVersionUID = 1L;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index 2b0a2e7..07624de 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -76,7 +76,11 @@
}
private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
- return resultState.read(datasetMemoryManager, offset, buffer);
+ if (datasetMemoryManager == null) {
+ return resultState.read(offset, buffer);
+ } else {
+ return resultState.read(datasetMemoryManager, offset, buffer);
+ }
}
});
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 51cfc0d..8f4b639 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -72,7 +72,11 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- resultState.write(datasetMemoryManager, buffer);
+ if (datasetMemoryManager == null) {
+ resultState.write(buffer);
+ } else {
+ resultState.write(datasetMemoryManager, buffer);
+ }
}
@Override
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 09d1bff..911f372 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -99,6 +99,19 @@
}
}
+ public synchronized void write(ByteBuffer buffer) throws HyracksDataException {
+ if (fileRef == null) {
+ String fName = FILE_PREFIX + String.valueOf(resultSetPartitionId.getPartition());
+ fileRef = fileFactory.createUnmanagedWorkspaceFile(fName);
+ writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ }
+
+ size += ioManager.syncWrite(writeFileHandle, size, buffer);
+
+ notifyAll();
+ }
+
public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
throws HyracksDataException {
int srcOffset = 0;
@@ -132,6 +145,28 @@
}
}
+ public synchronized long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ long readSize = 0;
+
+ while (offset >= size && !eos.get() && !failed.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ if ((offset >= size && eos.get()) || failed.get()) {
+ return readSize;
+ }
+
+ if (readFileHandle == null) {
+ initReadFileHandle();
+ }
+ readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+
+ return readSize;
+ }
+
public long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
throws HyracksDataException {
long readSize = 0;
@@ -157,8 +192,8 @@
if (readSize < buffer.capacity()) {
long localPageOffset = offset - persistentSize;
- int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
- int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
+ int localPageIndex = (int) (localPageOffset / DatasetMemoryManager.getPageSize());
+ int pageOffset = (int) (localPageOffset % DatasetMemoryManager.getPageSize());
Page page = getPage(localPageIndex);
if (page == null) {
return readSize;