Added flush call to IFrameWriter
git-svn-id: https://hyracks.googlecode.com/svn/trunk@107 123451ca-8445-de46-9d55-352943316053
diff --git a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
index eccd6cb..0ded474 100644
--- a/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks/hyracks-api/src/main/java/edu/uci/ics/hyracks/api/comm/IFrameWriter.java
@@ -23,5 +23,7 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
+ public void flush() throws HyracksDataException;
+
public void close() throws HyracksDataException;
}
\ No newline at end of file
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
index b80d0f5..5677827 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/NodeControllerService.java
@@ -390,6 +390,11 @@
.put("framecount." + conn.getConnectorId().getId() + ".sender." + senderIndex + "."
+ receiverIndex, String.valueOf(frameCount));
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
} : writer;
}
diff --git a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java
index b4868c0..927d67b 100644
--- a/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java
+++ b/hyracks/hyracks-control-nc/src/main/java/edu/uci/ics/hyracks/control/nc/comm/ConnectionManager.java
@@ -229,6 +229,10 @@
@Override
public void open() throws HyracksDataException {
}
+
+ @Override
+ public void flush() {
+ }
}
private final class ConnectionListenerThread extends Thread {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
index ca7867f..0a56bea 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameDeserializingDataWriter.java
@@ -50,4 +50,8 @@
public void open() throws HyracksDataException {
writer.open();
}
+
+ @Override
+ public void flush() {
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
index bebf7d4..f5180f8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/base/AbstractUnaryOutputSourceOperatorNodePushable.java
@@ -23,4 +23,8 @@
public final void nextFrame(ByteBuffer buffer) throws HyracksDataException {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public final void flush() throws HyracksDataException {
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/HashDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/HashDataWriter.java
index 0bbf252..5e934c8 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/HashDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/HashDataWriter.java
@@ -92,4 +92,18 @@
}
}
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ for (int i = 0; i < appenders.length; ++i) {
+ FrameTupleAppender appender = appenders[i];
+ if (appender.getTupleCount() > 0) {
+ ByteBuffer buffer = appender.getBuffer();
+ IFrameWriter frameWriter = epWriters[i];
+ flushFrame(buffer, frameWriter);
+ epWriters[i].flush();
+ appender.reset(buffer, true);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
index de8abb4..5346334 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
@@ -90,6 +90,20 @@
appenders[i].reset(appenders[i].getBuffer(), true);
}
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ for (int i = 0; i < appenders.length; ++i) {
+ FrameTupleAppender appender = appenders[i];
+ if (appender.getTupleCount() > 0) {
+ ByteBuffer buffer = appender.getBuffer();
+ IFrameWriter frameWriter = epWriters[i];
+ flushFrame(buffer, frameWriter);
+ epWriters[i].flush();
+ appender.reset(buffer, true);
+ }
+ }
+ }
}
private final int partitioningField;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 505c616..1ee7fda 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -35,8 +35,9 @@
private static final long serialVersionUID = 1L;
@Override
- public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc, IEndpointDataWriterFactory edwFactory,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ public IFrameWriter createSendSideWriter(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IEndpointDataWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
for (int i = 0; i < nConsumerPartitions; ++i) {
epWriters[i] = edwFactory.createFrameWriter(i);
@@ -66,12 +67,20 @@
epWriters[i].open();
}
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ for (int i = 0; i < epWriters.length; ++i) {
+ epWriters[i].flush();
+ }
+ }
};
}
@Override
- public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc, IConnectionDemultiplexer demux,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ public IFrameReader createReceiveSideReader(IHyracksContext ctx, RecordDescriptor recordDesc,
+ IConnectionDemultiplexer demux, int index, int nProducerPartitions, int nConsumerPartitions)
+ throws HyracksDataException {
return new NonDeterministicFrameReader(ctx, demux);
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
index 59d6ed5..a568781 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/group/HashGroupOperatorDescriptor.java
@@ -31,6 +31,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class HashGroupOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String HASHTABLE = "hashtable";
@@ -104,6 +105,10 @@
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
};
}
@@ -119,9 +124,7 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new IOperatorNodePushable() {
- private IFrameWriter writer;
-
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void open() throws HyracksDataException {
GroupingHashTable table = (GroupingHashTable) env.get(HASHTABLE);
@@ -132,22 +135,9 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new IllegalStateException();
- }
-
- @Override
public void close() throws HyracksDataException {
// do nothing
}
-
- @Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- if (index != 0) {
- throw new IllegalArgumentException();
- }
- this.writer = writer;
- }
};
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
index 0b254da..c124cb3 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/GraceHashJoinOperatorDescriptor.java
@@ -42,6 +42,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.partition.RepartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class GraceHashJoinOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String SMALLRELATION = "RelR";
@@ -213,6 +214,9 @@
}
}
+ @Override
+ public void flush() throws HyracksDataException {
+ }
};
return op;
}
@@ -236,10 +240,9 @@
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getOperatorId(), 1);
- IOperatorNodePushable op = new IOperatorNodePushable() {
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
private InMemoryHashJoin joiner;
- private IFrameWriter writer;
private FileChannel[] channelsR;
private FileChannel[] channelsS;
private int numPartitions;
@@ -318,24 +321,11 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new IllegalStateException();
- }
-
- @Override
public void close() throws HyracksDataException {
env.set(LARGERELATION, null);
env.set(SMALLRELATION, null);
env.set(NUM_PARTITION, null);
}
-
- @Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- if (index != 0) {
- throw new IllegalStateException();
- }
- this.writer = writer;
- }
};
return op;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index 24b655d9..1da6b61 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -313,6 +313,9 @@
ftappender.reset(inBuffer, true);
}
+ @Override
+ public void flush() throws HyracksDataException {
+ }
};
return op;
}
@@ -553,6 +556,11 @@
}
this.writer = writer;
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
};
return op;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index cf9a373..7e64312 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -116,6 +116,10 @@
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
};
return op;
}
@@ -161,6 +165,11 @@
}
this.writer = writer;
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
};
return op;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
index 6b3de09..0bfe83d 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/MaterializingOperatorDescriptor.java
@@ -32,6 +32,7 @@
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class MaterializingOperatorDescriptor extends AbstractOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -110,6 +111,10 @@
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
};
}
@@ -125,9 +130,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- return new IOperatorNodePushable() {
- private IFrameWriter writer;
-
+ return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void open() throws HyracksDataException {
try {
@@ -154,22 +157,9 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new IllegalStateException();
- }
-
- @Override
public void close() throws HyracksDataException {
env.set(MATERIALIZED_FILE, null);
}
-
- @Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- if (index != 0) {
- throw new IllegalArgumentException();
- }
- this.writer = writer;
- }
};
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
index 3b7b02c..b35954f 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/NullSinkOperatorDescriptor.java
@@ -38,6 +38,10 @@
@Override
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
};
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
index ea2a556..4594685 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/ExternalSortOperatorDescriptor.java
@@ -43,6 +43,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import edu.uci.ics.hyracks.dataflow.std.util.ReferenceEntry;
import edu.uci.ics.hyracks.dataflow.std.util.ReferencedPriorityQueue;
@@ -275,6 +276,10 @@
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
};
return op;
}
@@ -295,8 +300,7 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- IOperatorNodePushable op = new IOperatorNodePushable() {
- private IFrameWriter writer;
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
private List<ByteBuffer> inFrames;
private ByteBuffer outFrame;
LinkedList<File> runs;
@@ -336,23 +340,10 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new IllegalStateException();
- }
-
- @Override
public void close() throws HyracksDataException {
// do nothing
}
- @Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- if (index != 0) {
- throw new IllegalArgumentException();
- }
- this.writer = writer;
- }
-
// creates a new run from runs that can fit in memory.
private void doPass(LinkedList<File> runs, int passCount) throws ClassNotFoundException, Exception {
File newRun = null;
@@ -559,6 +550,10 @@
throw new HyracksDataException(e);
}
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
}
public static class RunFileReader implements IFrameReader {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 0b95a5a..be81ecd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -35,6 +35,7 @@
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractActivityNode;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class InMemorySortOperatorDescriptor extends AbstractOperatorDescriptor {
private static final String BUFFERS = "buffers";
@@ -208,6 +209,10 @@
public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
throw new IllegalArgumentException();
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
};
return op;
}
@@ -224,9 +229,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksContext ctx, final IOperatorEnvironment env,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
- IOperatorNodePushable op = new IOperatorNodePushable() {
- private IFrameWriter writer;
-
+ IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void open() throws HyracksDataException {
List<ByteBuffer> buffers = (List<ByteBuffer>) env.get(BUFFERS);
@@ -265,22 +268,9 @@
}
@Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- throw new IllegalStateException();
- }
-
- @Override
public void close() throws HyracksDataException {
// do nothing
}
-
- @Override
- public void setFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
- if (index != 0) {
- throw new IllegalArgumentException();
- }
- this.writer = writer;
- }
};
return op;
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
index 13bba15..037e546 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/util/DeserializedOperatorNodePushable.java
@@ -61,4 +61,8 @@
public void open() throws HyracksDataException {
delegate.open();
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
index 6ddc8ec..56569d9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/comm/SerializationDeserializationTest.java
@@ -69,6 +69,10 @@
public void close() throws HyracksDataException {
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
});
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
index e88cca4..d6b0036 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeBulkLoadOperatorNodePushable.java
@@ -68,5 +68,8 @@
e.printStackTrace();
}
}
-
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
index 47c612b..74c878a 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeDiskOrderScanOperatorNodePushable.java
@@ -90,4 +90,8 @@
@Override
public void close() throws HyracksDataException {
}
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertOperatorNodePushable.java
index 6cc64ee..eddbbfa 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeInsertOperatorNodePushable.java
@@ -63,5 +63,9 @@
} catch (Exception e) {
e.printStackTrace();
}
- }
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
}
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 69f6490..e360be2 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/edu/uci/ics/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -108,5 +108,9 @@
@Override
public void close() throws HyracksDataException {
- }
-}
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ }
+}
\ No newline at end of file