Implement a class for storing the state of the results.
git-svn-id: https://hyracks.googlecode.com/svn/branches/fullstack_hyracks_result_distribution@3064 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
new file mode 100644
index 0000000..7275dfd
--- /dev/null
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/dataset/Page.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.api.dataset;
+
+import java.nio.ByteBuffer;
+
+public class Page {
+ private final ByteBuffer buffer;
+
+ public Page(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ public ByteBuffer clear() {
+ return (ByteBuffer) buffer.clear();
+ }
+}
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
new file mode 100644
index 0000000..d2e86bc
--- /dev/null
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/dataset/ResultState.java
@@ -0,0 +1,170 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.control.nc.dataset;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import edu.uci.ics.hyracks.api.dataflow.state.IStateObject;
+import edu.uci.ics.hyracks.api.dataset.Page;
+import edu.uci.ics.hyracks.api.io.FileReference;
+import edu.uci.ics.hyracks.api.io.IIOManager;
+import edu.uci.ics.hyracks.api.job.JobId;
+import edu.uci.ics.hyracks.api.partitions.ResultSetPartitionId;
+
+public class ResultState implements IStateObject {
+ private final ResultSetPartitionId resultSetPartitionId;
+
+ private final int frameSize;
+
+ private final IIOManager ioManager;
+
+ private final AtomicBoolean eos;
+
+ private final AtomicBoolean readEOS;
+
+ private final List<Page> localPageList;
+
+ private FileReference fileRef;
+
+ private long size;
+
+ private long persistentSize;
+
+ ResultState(ResultSetPartitionId resultSetPartitionId, IIOManager ioManager, int frameSize) {
+ this.resultSetPartitionId = resultSetPartitionId;
+ this.ioManager = ioManager;
+ this.frameSize = frameSize;
+ eos = new AtomicBoolean(false);
+ readEOS = new AtomicBoolean(false);
+ localPageList = new ArrayList<Page>();
+ }
+
+ public void init(FileReference fileRef) {
+ this.fileRef = fileRef;
+
+ size = 0;
+ persistentSize = 0;
+ }
+
+ public ResultSetPartitionId getResultSetPartitionId() {
+ return resultSetPartitionId;
+ }
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ public IIOManager getIOManager() {
+ return ioManager;
+ }
+
+ public synchronized void incrementSize(long delta) {
+ size += delta;
+ }
+
+ public synchronized long getSize() {
+ return size;
+ }
+
+ public synchronized void incrementPersistentSize(long delta) {
+ persistentSize += delta;
+ }
+
+ public synchronized long getPersistentSize() {
+ return persistentSize;
+ }
+
+ public void setEOS(boolean eos) {
+ this.eos.set(eos);
+ }
+
+ public boolean getEOS() {
+ return eos.get();
+ }
+
+ public boolean getReadEOS() {
+ return readEOS.get();
+ }
+
+ public synchronized void addPage(Page page) {
+ localPageList.add(page);
+ }
+
+ public synchronized Page removePage(int index) {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.remove(index);
+ }
+ return page;
+ }
+
+ public synchronized Page getPage(int index) {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.get(index);
+ }
+ return page;
+ }
+
+ public synchronized Page getLastPage() {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.get(localPageList.size() - 1);
+ }
+ return page;
+ }
+
+ public synchronized Page getFirstPage() {
+ Page page = null;
+ if (!localPageList.isEmpty()) {
+ page = localPageList.get(0);
+ }
+ return page;
+ }
+
+ public FileReference getFileReference() {
+ return fileRef;
+ }
+
+ @Override
+ public JobId getJobId() {
+ return resultSetPartitionId.getJobId();
+ }
+
+ @Override
+ public Object getId() {
+ return resultSetPartitionId;
+ }
+
+ @Override
+ public long getMemoryOccupancy() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void toBytes(DataOutput out) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void fromBytes(DataInput in) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file