[NO ISSUE][RT] Follow IFrameWriter protocol in AbstractOneInputPushRuntime
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Many implementations of AbstractOneInputPushRuntime didn't
follow the IFrameWriter protocol causing many unexpected
runtime exceptions.
- This change ensures that all of the subclasses implement the
protocol correctly.
Change-Id: I5133007f298366f58b53acc9f48bc553724dd7b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2884
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mblow@apache.org>
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 74ba139..2692cc7 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -89,7 +89,7 @@
return;
}
initAccessAppend(ctx);
- writer.open();
+ super.open();
} catch (ACIDException e) {
throw HyracksDataException.create(e);
}
@@ -142,31 +142,6 @@
}
@Override
- public void fail() throws HyracksDataException {
- failed = true;
- if (isSink) {
- return;
- }
- writer.fail();
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (isSink) {
- return;
- }
- try {
- flushIfNotFailed();
- } catch (Exception e) {
- writer.fail();
- throw e;
- } finally {
- writer.close();
- }
- appender.reset(frame, true);
- }
-
- @Override
public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
this.inputRecordDesc = recordDescriptor;
this.tAccess = new FrameTupleAccessor(inputRecordDesc);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index e99b61b..1f9cb91 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -67,7 +67,6 @@
private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
private boolean first = true;
- private boolean isOpen = false;
@Override
public void open() throws HyracksDataException {
@@ -81,8 +80,7 @@
for (int i = 0; i < aggregFactories.length; i++) {
aggregs[i].init();
}
- isOpen = true;
- writer.open();
+ super.open();
}
@Override
@@ -121,14 +119,6 @@
aggregs[f].step(tupleRef);
}
}
-
- @Override
- public void fail() throws HyracksDataException {
- failed = true;
- if (isOpen) {
- writer.fail();
- }
- }
};
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index a7468a7..71b44d3 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -51,25 +52,20 @@
@Override
public void close() throws HyracksDataException {
- HyracksDataException closeException = null;
+ if (!isOpen) {
+ return;
+ }
+ Throwable closeException = null;
try {
flushIfNotFailed();
} catch (Exception e) {
- closeException = HyracksDataException.create(e);
- writer.fail();
+ closeException = e;
+ fail(closeException);
} finally {
- try {
- writer.close();
- } catch (Exception e) {
- if (closeException == null) {
- closeException = HyracksDataException.create(e);
- } else {
- closeException.addSuppressed(e);
- }
- }
+ closeException = CleanupUtils.close(writer, closeException);
}
if (closeException != null) {
- throw closeException;
+ throw HyracksDataException.create(closeException);
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
index 5cced8d..c7d2d94 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputPushRuntime.java
@@ -27,6 +27,7 @@
protected IFrameWriter writer;
protected RecordDescriptor outputRecordDesc;
protected boolean failed;
+ protected boolean isOpen;
@Override
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
@@ -35,8 +36,24 @@
}
@Override
+ public void open() throws HyracksDataException {
+ isOpen = true;
+ writer.open();
+ }
+
+ @Override
public void fail() throws HyracksDataException {
failed = true;
- writer.fail();
+ if (isOpen) {
+ writer.fail();
+ }
+ }
+
+ protected void fail(Throwable failure) {
+ try {
+ fail();
+ } catch (Throwable th) {
+ failure.addSuppressed(th);
+ }
}
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
index 35563e0..cccfd62 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputSourcePushRuntime.java
@@ -32,16 +32,6 @@
}
@Override
- public void close() throws HyracksDataException {
- // close is a no op since this operator completes operating in open()
- }
-
- @Override
- public void fail() throws HyracksDataException {
- // fail is a no op since if a failure happened, the operator would've already called fail() on downstream
- }
-
- @Override
public void flush() throws HyracksDataException {
// flush will never be called on this runtime
throw new UnsupportedOperationException();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 159fde7..3cee12d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -151,7 +151,8 @@
@Override
public void open() throws HyracksDataException {
- writer.open();
+ // writer opened many times?
+ super.open();
if (first) {
first = false;
initAccessAppendRef(ctx);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index f251bb7..2453029 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.std.buffermanager.EnumFreeSlotPolicy;
import org.apache.hyracks.dataflow.std.buffermanager.FrameFreeSlotPolicyFactory;
import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
@@ -69,7 +70,7 @@
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (frameSorter == null) {
IFrameBufferManager manager = new VariableFrameMemoryManager(
new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
@@ -87,11 +88,22 @@
@Override
public void close() throws HyracksDataException {
- try {
- frameSorter.sort();
- frameSorter.flush(writer);
- } finally {
- writer.close();
+ Throwable failure = null;
+ if (isOpen) {
+ try {
+ if (!failed) {
+ frameSorter.sort();
+ frameSorter.flush(writer);
+ }
+ } catch (Throwable th) {
+ failure = th;
+ fail(th);
+ } finally {
+ failure = CleanupUtils.close(writer, failure);
+ }
+ }
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
}
}
};
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index b1b652f..5b36c5f 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -96,7 +96,6 @@
private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
private boolean first = true;
- private boolean isOpen = false;
private int tupleIndex = 0;
@Override
@@ -109,15 +108,7 @@
eval[i] = evalFactories[i].createScalarEvaluator(ctx);
}
}
- isOpen = true;
- writer.open();
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (isOpen) {
- super.close();
- }
+ super.open();
}
@Override
@@ -177,13 +168,6 @@
}
@Override
- public void fail() throws HyracksDataException {
- if (isOpen) {
- super.fail();
- }
- }
-
- @Override
public void flush() throws HyracksDataException {
appender.flush(writer);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 7bd924d..9ca3cd6 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -48,7 +48,7 @@
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new IllegalStateException();
}
@@ -56,13 +56,10 @@
}
@Override
- public void fail() throws HyracksDataException {
- writer.fail();
- }
-
- @Override
public void close() throws HyracksDataException {
- writer.close();
+ if (isOpen) {
+ writer.close();
+ }
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index f94672d..832cb22 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -49,11 +49,6 @@
initAccessAppend(ctx);
}
- @Override
- public void open() throws HyracksDataException {
- writer.open();
- }
-
public void writeTuple(ByteBuffer inputBuffer, int tIndex) throws HyracksDataException {
tAccess.reset(inputBuffer);
appendTupleToFrame(tIndex);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 33b7725..ca58d4d 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -89,7 +89,6 @@
private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
private boolean first = true;
- private boolean isOpen = false;
@Override
public void open() throws HyracksDataException {
@@ -104,22 +103,7 @@
for (int i = 0; i < runningAggregates.length; i++) {
raggs[i].init();
}
- isOpen = true;
- writer.open();
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (isOpen) {
- super.close();
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- if (isOpen) {
- writer.fail();
- }
+ super.open();
}
@Override
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 59df402..aca5bf1 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -72,7 +72,7 @@
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (evalMaxObjects == null) {
initAccessAppendRef(ctx);
evalMaxObjects = maxObjectsEvalFactory.createScalarEvaluator(ctx);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index a8ca082..713a99c 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -53,7 +53,7 @@
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (first) {
first = false;
initAccessAppend(ctx);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 171544d..933e640 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -92,8 +92,7 @@
initAccessAppendFieldRef(ctx);
eval = cond.createScalarEvaluator(ctx);
}
- writer.open();
-
+ super.open();
//prepare nullTupleBuilder
if (retainMissing && missingWriter == null) {
missingWriter = missingWriterFactory.createMissingWriter();
@@ -105,15 +104,6 @@
}
@Override
- public void close() throws HyracksDataException {
- try {
- flushIfNotFailed();
- } finally {
- writer.close();
- }
- }
-
- @Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
index 53974b2..7e5c346 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StringStreamingRuntimeFactory.java
@@ -43,6 +43,9 @@
private char fieldDelimiter;
private ITupleParserFactory parserFactory;
+ /*
+ * NOTE: This operator doesn't follow the IFrameWriter protocol
+ */
public StringStreamingRuntimeFactory(String command, IPrinterFactory[] printerFactories, char fieldDelimiter,
ITupleParserFactory parserFactory) {
super(null);
@@ -129,7 +132,6 @@
first = false;
initAccessAppendRef(ctx);
}
-
try {
ITupleParser parser = parserFactory.createTupleParser(ctx);
process = Runtime.getRuntime().exec(command);
@@ -141,6 +143,7 @@
new DumpInStreamToPrintStream(process.getErrorStream(), System.err);
dumpStderr = new Thread(disps);
dumpStderr.start();
+ super.open();
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 914f4a0..22189ac 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -94,7 +94,7 @@
@Override
public void open() throws HyracksDataException {
- writer.open();
+ super.open();
if (tRef == null) {
initAccessAppendRef(ctx);
}