Refactor result caching mechanism.
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
deleted file mode 100644
index 8f5ed64..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionReader.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-
-public interface IDatasetPartitionReader {
- public void writeTo(IFrameWriter writer);
-}
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
deleted file mode 100644
index 42dc157..0000000
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/IDatasetPartitionWriter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2009-2010 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.api.dataset;
-
-import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-
-public interface IDatasetPartitionWriter extends IFrameWriter {
- public Page returnPage() throws HyracksDataException;
-}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
index 2386849..4a3c72d 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetMemoryManager.java
@@ -21,7 +21,6 @@
import java.util.Map;
import java.util.Set;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
@@ -50,7 +49,7 @@
resultPartitionNodesMap = new HashMap<ResultSetPartitionId, PartitionNode>();
}
- public Page requestPage(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter dpw)
+ public Page requestPage(ResultSetPartitionId resultSetPartitionId, ResultState resultState)
throws OutOfMemoryError, HyracksDataException {
Page page;
if (availPages.isEmpty()) {
@@ -76,7 +75,7 @@
* update reference call before a page is pushed on to the element of the LRU list. So we first obtain the page,
* then make a updateReference call which in turn creates a new node in the LRU list and then add the page to it.
*/
- PartitionNode pn = updateReference(resultSetPartitionId, dpw);
+ PartitionNode pn = updateReference(resultSetPartitionId, resultState);
pn.add(page);
return page;
}
@@ -96,12 +95,12 @@
}
protected synchronized PartitionNode updateReference(ResultSetPartitionId resultSetPartitionId,
- IDatasetPartitionWriter dpw) {
+ ResultState resultState) {
PartitionNode pn = null;
if (!resultPartitionNodesMap.containsKey(resultSetPartitionId)) {
- if (dpw != null) {
- pn = new PartitionNode(resultSetPartitionId, dpw);
+ if (resultState != null) {
+ pn = new PartitionNode(resultSetPartitionId, resultState);
insertPartitionNode(resultSetPartitionId, pn);
}
return pn;
@@ -115,8 +114,8 @@
protected synchronized Page evictPage() throws HyracksDataException {
PartitionNode pn = leastRecentlyUsedList.getFirst();
- IDatasetPartitionWriter dpw = pn.getDatasetPartitionWriter();
- Page page = dpw.returnPage();
+ ResultState resultState = pn.getResultState();
+ Page page = resultState.returnPage();
/* If the partition holding the pages breaks the contract by not returning the page or it has no page, just take
* away all the pages allocated to it and add to the available pages set.
@@ -202,15 +201,15 @@
private final ResultSetPartitionId resultSetPartitionId;
- private final IDatasetPartitionWriter datasetPartitionWriter;
+ private final ResultState resultState;
private PartitionNode prev;
private PartitionNode next;
- public PartitionNode(ResultSetPartitionId resultSetPartitionId, IDatasetPartitionWriter datasetPartitionWriter) {
+ public PartitionNode(ResultSetPartitionId resultSetPartitionId, ResultState resultState) {
this.resultSetPartitionId = resultSetPartitionId;
- this.datasetPartitionWriter = datasetPartitionWriter;
+ this.resultState = resultState;
prev = null;
next = null;
}
@@ -219,8 +218,8 @@
return resultSetPartitionId;
}
- public IDatasetPartitionWriter getDatasetPartitionWriter() {
- return datasetPartitionWriter;
+ public ResultState getResultState() {
+ return resultState;
}
public void setPrev(PartitionNode node) {
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
index 9b92a22..7b61021 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionManager.java
@@ -22,7 +22,6 @@
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.IWorkspaceFileFactory;
@@ -64,7 +63,7 @@
for (int i = 0; i < resultStates.length; i++) {
ResultState state = resultStates[i];
if (state != null) {
- state.deinit();
+ state.closeAndDelete();
LOGGER.fine("Removing partition: " + i + " for JobId: " + eldest.getKey());
}
}
@@ -141,7 +140,7 @@
}
}
- IDatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
+ DatasetPartitionReader dpr = new DatasetPartitionReader(datasetMemoryManager, executor, resultState);
dpr.writeTo(writer);
LOGGER.fine("Initialized partition reader: JobId: " + jobId + ":partition: " + partition);
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
index dbe3e4d..f47f5a9 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionReader.java
@@ -20,14 +20,10 @@
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionReader;
-import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.comm.channels.NetworkOutputChannel;
-public class DatasetPartitionReader implements IDatasetPartitionReader {
+public class DatasetPartitionReader {
private static final Logger LOGGER = Logger.getLogger(DatasetPartitionReader.class.getName());
private final DatasetMemoryManager datasetMemoryManager;
@@ -36,51 +32,12 @@
private final ResultState resultState;
- private IFileHandle fileHandle;
-
public DatasetPartitionReader(DatasetMemoryManager datasetMemoryManager, Executor executor, ResultState resultState) {
this.datasetMemoryManager = datasetMemoryManager;
this.executor = executor;
this.resultState = resultState;
}
- private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
- long readSize = 0;
- synchronized (resultState) {
- while (offset >= resultState.getSize() && !resultState.getEOS()) {
- try {
- resultState.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
- }
- }
- }
-
- if (offset >= resultState.getSize() && resultState.getEOS()) {
- return readSize;
- }
-
- if (offset < resultState.getPersistentSize()) {
- readSize = resultState.getIOManager().syncRead(fileHandle, offset, buffer);
- }
-
- if (readSize < buffer.capacity()) {
- long localPageOffset = offset - resultState.getPersistentSize();
- int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
- int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
- Page page = resultState.getPage(localPageIndex);
- if (page == null) {
- return readSize;
- }
- readSize += buffer.remaining();
- buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
- }
-
- datasetMemoryManager.pageReferenced(resultState.getResultSetPartitionId());
- return readSize;
- }
-
- @Override
public void writeTo(final IFrameWriter writer) {
executor.execute(new Runnable() {
@Override
@@ -88,8 +45,7 @@
NetworkOutputChannel channel = (NetworkOutputChannel) writer;
channel.setFrameSize(resultState.getFrameSize());
try {
- fileHandle = resultState.getIOManager().open(resultState.getValidFileReference(),
- IIOManager.FileReadWriteMode.READ_ONLY, IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ resultState.readOpen();
channel.open();
try {
long offset = 0;
@@ -109,7 +65,7 @@
}
} finally {
channel.close();
- resultState.getIOManager().close(fileHandle);
+ resultState.readClose();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -120,6 +76,10 @@
LOGGER.info("result reading successful(" + resultState.getResultSetPartitionId() + ")");
}
}
+
+ private long read(long offset, ByteBuffer buffer) throws HyracksDataException {
+ return resultState.read(datasetMemoryManager, offset, buffer);
+ }
});
}
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
index 317f553..c1f8f29 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/DatasetPartitionWriter.java
@@ -18,20 +18,17 @@
import java.util.logging.Level;
import java.util.logging.Logger;
+import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionManager;
-import edu.uci.ics.hyracks.api.dataset.IDatasetPartitionWriter;
-import edu.uci.ics.hyracks.api.dataset.Page;
import edu.uci.ics.hyracks.api.dataset.ResultSetId;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.io.FileReference;
-import edu.uci.ics.hyracks.api.io.IFileHandle;
-import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.api.job.JobId;
import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
-public class DatasetPartitionWriter implements IDatasetPartitionWriter {
+public class DatasetPartitionWriter implements IFrameWriter {
private static final Logger LOGGER = Logger.getLogger(DatasetPartitionWriter.class.getName());
private static final String FILE_PREFIX = "result_";
@@ -50,8 +47,6 @@
private final ResultState resultState;
- private IFileHandle fileHandle;
-
public DatasetPartitionWriter(IHyracksTaskContext ctx, IDatasetPartitionManager manager, JobId jobId,
ResultSetId rsId, int partition, DatasetMemoryManager datasetMemoryManager) {
this.manager = manager;
@@ -75,30 +70,12 @@
}
String fName = FILE_PREFIX + String.valueOf(partition);
FileReference fRef = manager.getFileFactory().createUnmanagedWorkspaceFile(fName);
- fileHandle = resultState.getIOManager().open(fRef, IIOManager.FileReadWriteMode.READ_WRITE,
- IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
- resultState.init(fRef, fileHandle);
+ resultState.open(fRef);
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- int srcOffset = 0;
- Page destPage = resultState.getLastPage();
-
- while (srcOffset < buffer.limit()) {
- if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
- destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
- resultState.addPage(destPage);
- }
- int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
- destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
- srcOffset += srcLength;
- resultState.incrementSize(srcLength);
- }
-
- synchronized (resultState) {
- resultState.notifyAll();
- }
+ resultState.write(datasetMemoryManager, buffer);
}
@Override
@@ -117,32 +94,10 @@
}
try {
- synchronized (resultState) {
- resultState.setEOS(true);
- resultState.notifyAll();
- }
+ resultState.close();
manager.reportPartitionWriteCompletion(jobId, resultSetId, partition);
} catch (HyracksException e) {
throw new HyracksDataException(e);
}
}
-
- @Override
- public Page returnPage() throws HyracksDataException {
- Page page = resultState.removePage(0);
-
- IIOManager ioManager = resultState.getIOManager();
-
- // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
- if (page == null) {
- ioManager.close(fileHandle);
- return null;
- }
-
- page.getBuffer().flip();
-
- long delta = ioManager.syncWrite(fileHandle, resultState.getPersistentSize(), page.getBuffer());
- resultState.incrementPersistentSize(delta);
- return page;
- }
}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
index 661df93..097c663 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -17,12 +17,14 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IFileHandle;
import edu.uci.ics.hyracks.api.io.IIOManager;
@@ -38,14 +40,14 @@
private final AtomicBoolean eos;
- private final AtomicBoolean readEOS;
-
private final List<Page> localPageList;
private FileReference fileRef;
private IFileHandle writeFileHandle;
+ private IFileHandle readFileHandle;
+
private long size;
private long persistentSize;
@@ -55,20 +57,26 @@
this.ioManager = ioManager;
this.frameSize = frameSize;
eos = new AtomicBoolean(false);
- readEOS = new AtomicBoolean(false);
localPageList = new ArrayList<Page>();
}
- public synchronized void init(FileReference fileRef, IFileHandle writeFileHandle) {
+ public synchronized void open(FileReference fileRef) throws HyracksDataException {
this.fileRef = fileRef;
- this.writeFileHandle = writeFileHandle;
+
+ writeFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
size = 0;
persistentSize = 0;
notifyAll();
}
- public synchronized void deinit() {
+ public synchronized void close() {
+ eos.set(true);
+ notifyAll();
+ }
+
+ public synchronized void closeAndDelete() {
if (writeFileHandle != null) {
try {
ioManager.close(writeFileHandle);
@@ -79,6 +87,98 @@
fileRef.delete();
}
+ public synchronized void write(DatasetMemoryManager datasetMemoryManager, ByteBuffer buffer)
+ throws HyracksDataException {
+ int srcOffset = 0;
+ Page destPage = getPage(localPageList.size() - 1);
+
+ while (srcOffset < buffer.limit()) {
+ if ((destPage == null) || (destPage.getBuffer().remaining() <= 0)) {
+ destPage = datasetMemoryManager.requestPage(resultSetPartitionId, this);
+ localPageList.add(destPage);
+ }
+ int srcLength = Math.min(buffer.limit() - srcOffset, destPage.getBuffer().remaining());
+ destPage.getBuffer().put(buffer.array(), srcOffset, srcLength);
+ srcOffset += srcLength;
+ size += srcLength;
+ }
+
+ notifyAll();
+ }
+
+ public synchronized void readOpen() throws InterruptedException, HyracksDataException {
+ while (fileRef == null) {
+ wait();
+ }
+ readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ }
+
+ public synchronized void readClose() throws InterruptedException, HyracksDataException {
+ while (fileRef == null) {
+ wait();
+ }
+ readFileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_ONLY,
+ IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+ }
+
+ public synchronized long read(DatasetMemoryManager datasetMemoryManager, long offset, ByteBuffer buffer)
+ throws HyracksDataException {
+ long readSize = 0;
+ synchronized (this) {
+ while (offset >= size && !eos.get()) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ if (offset >= size && eos.get()) {
+ return readSize;
+ }
+
+ if (offset < persistentSize) {
+ readSize = ioManager.syncRead(readFileHandle, offset, buffer);
+ }
+
+ if (readSize < buffer.capacity()) {
+ long localPageOffset = offset - persistentSize;
+ int localPageIndex = (int) (localPageOffset / datasetMemoryManager.getPageSize());
+ int pageOffset = (int) (localPageOffset % datasetMemoryManager.getPageSize());
+ Page page = getPage(localPageIndex);
+ if (page == null) {
+ return readSize;
+ }
+ readSize += buffer.remaining();
+ buffer.put(page.getBuffer().array(), pageOffset, buffer.remaining());
+ }
+
+ datasetMemoryManager.pageReferenced(resultSetPartitionId);
+ return readSize;
+ }
+
+ public synchronized Page returnPage() throws HyracksDataException {
+ Page page = removePage(0);
+
+ // If we do not have any pages to be given back close the write channel since we don't write any more, return null.
+ if (page == null) {
+ ioManager.close(writeFileHandle);
+ return null;
+ }
+
+ page.getBuffer().flip();
+
+ long delta = ioManager.syncWrite(writeFileHandle, persistentSize, page.getBuffer());
+ persistentSize += delta;
+ return page;
+ }
+
+ public synchronized void setEOS(boolean eos) {
+ this.eos.set(eos);
+ }
+
public ResultSetPartitionId getResultSetPartitionId() {
return resultSetPartitionId;
}
@@ -91,76 +191,6 @@
return ioManager;
}
- public synchronized void incrementSize(long delta) {
- size += delta;
- }
-
- public synchronized long getSize() {
- return size;
- }
-
- public synchronized void incrementPersistentSize(long delta) {
- persistentSize += delta;
- }
-
- public synchronized long getPersistentSize() {
- return persistentSize;
- }
-
- public void setEOS(boolean eos) {
- this.eos.set(eos);
- }
-
- public boolean getEOS() {
- return eos.get();
- }
-
- public boolean getReadEOS() {
- return readEOS.get();
- }
-
- public synchronized void addPage(Page page) {
- localPageList.add(page);
- }
-
- public synchronized Page removePage(int index) {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.remove(index);
- }
- return page;
- }
-
- public synchronized Page getPage(int index) {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.get(index);
- }
- return page;
- }
-
- public synchronized Page getLastPage() {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.get(localPageList.size() - 1);
- }
- return page;
- }
-
- public synchronized Page getFirstPage() {
- Page page = null;
- if (!localPageList.isEmpty()) {
- page = localPageList.get(0);
- }
- return page;
- }
-
- public synchronized FileReference getValidFileReference() throws InterruptedException {
- while (fileRef == null)
- wait();
- return fileRef;
- }
-
@Override
public JobId getJobId() {
return resultSetPartitionId.getJobId();
@@ -185,4 +215,20 @@
public void fromBytes(DataInput in) throws IOException {
throw new UnsupportedOperationException();
}
+
+ private Page getPage(int index) {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.get(index);
+ }
+ return page;
+ }
+
+ private Page removePage(int index) {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.remove(index);
+ }
+ return page;
+ }
}
\ No newline at end of file