Changed the IFrameWriter Contract
Updated existing operators and added a test case for BTreeSearchOperatorNodePushable.
With this change, calling the open method itself moves it to the open state and
hence, close must be called.
Change-Id: I03da090002f79f4db7b5b31454ce3ac2b9e40c7f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/551
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <buyingyi@gmail.com>
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
index 8759f1b..bafe8a7 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/AggregateRuntimeFactory.java
@@ -69,6 +69,7 @@
private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(aggregs.length);
private boolean first = true;
+ private boolean isOpen = false;
@Override
public void open() throws HyracksDataException {
@@ -86,7 +87,7 @@
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
-
+ isOpen = true;
writer.open();
}
@@ -103,9 +104,14 @@
@Override
public void close() throws HyracksDataException {
- computeAggregate();
- appendToFrameFromTupleBuilder(tupleBuilder);
- super.close();
+ if (isOpen) {
+ try {
+ computeAggregate();
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ } finally {
+ super.close();
+ }
+ }
}
private void computeAggregate() throws HyracksDataException {
@@ -132,7 +138,9 @@
@Override
public void fail() throws HyracksDataException {
- writer.fail();
+ if (isOpen) {
+ writer.fail();
+ }
}
};
}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 7fd1d05..e57a4ba 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -51,8 +51,11 @@
@Override
public void close() throws HyracksDataException {
- flushIfNotFailed();
- writer.close();
+ try {
+ flushIfNotFailed();
+ } finally {
+ writer.close();
+ }
}
protected void flushAndReset() throws HyracksDataException {
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index f10151e..25eb229 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -116,6 +116,7 @@
appendNullsToTuple();
appendToFrameFromTupleBuilder(tb);
}
+
}
@Override
@@ -146,11 +147,11 @@
@Override
public void open() throws HyracksDataException {
+ writer.open();
if (first) {
first = false;
initAccessAppendRef(ctx);
}
- writer.open();
}
@Override
@@ -164,6 +165,7 @@
startOfPipeline.close();
}
}
+
};
}
}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
index 291b92d..bca301f 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/sort/InMemorySortRuntimeFactory.java
@@ -31,8 +31,8 @@
import org.apache.hyracks.dataflow.std.sort.FrameSorterMergeSort;
import org.apache.hyracks.dataflow.std.sort.buffermanager.FrameFreeSlotLastFit;
import org.apache.hyracks.dataflow.std.sort.buffermanager.IFrameBufferManager;
-import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFrameMemoryManager;
+import org.apache.hyracks.dataflow.std.sort.buffermanager.VariableFramePool;
public class InMemorySortRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -64,15 +64,14 @@
@Override
public void open() throws HyracksDataException {
+ writer.open();
if (frameSorter == null) {
IFrameBufferManager manager = new VariableFrameMemoryManager(
- new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY),
- new FrameFreeSlotLastFit());
+ new VariableFramePool(ctx, VariableFramePool.UNLIMITED_MEMORY), new FrameFreeSlotLastFit());
frameSorter = new FrameSorterMergeSort(ctx, manager, sortFields, firstKeyNormalizerFactory,
comparatorFactories, outputRecordDesc);
}
frameSorter.reset();
- writer.open();
}
@Override
@@ -87,9 +86,12 @@
@Override
public void close() throws HyracksDataException {
- frameSorter.sort();
- frameSorter.flush(writer);
- writer.close();
+ try {
+ frameSorter.sort();
+ frameSorter.flush(writer);
+ } finally {
+ writer.close();
+ }
}
};
}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
index 9c8a35c..79d9b7d1 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/AssignRuntimeFactory.java
@@ -96,6 +96,7 @@
private IScalarEvaluator[] eval = new IScalarEvaluator[evalFactories.length];
private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
private boolean first = true;
+ private boolean isOpen = false;
@Override
public void open() throws HyracksDataException {
@@ -111,10 +112,18 @@
}
}
}
+ isOpen = true;
writer.open();
}
@Override
+ public void close() throws HyracksDataException {
+ if (isOpen) {
+ super.close();
+ }
+ }
+
+ @Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
@@ -158,7 +167,9 @@
@Override
public void fail() throws HyracksDataException {
- writer.fail();
+ if (isOpen) {
+ writer.fail();
+ }
}
};
}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
index 83d0c61..3d1eb06 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
@@ -66,9 +66,10 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
return new AbstractUnaryInputOperatorNodePushable() {
private final IFrameWriter[] writers = new IFrameWriter[outputArity];
+ private final boolean[] isOpen = new boolean[outputArity];
private final IFrame[] writeBuffers = new IFrame[outputArity];
private final ICopyEvaluator[] evals = new ICopyEvaluator[outputArity];
private final ArrayBackedValueStorage evalBuf = new ArrayBackedValueStorage();
@@ -83,19 +84,52 @@
@Override
public void close() throws HyracksDataException {
- // Flush (possibly not full) buffers that have data, and close writers.
+ HyracksDataException hde = null;
for (int i = 0; i < outputArity; i++) {
- tupleAppender.reset(writeBuffers[i], false);
- // ? by JF why didn't clear the buffer ?
- tupleAppender.flush(writers[i], false);
- writers[i].close();
+ if (isOpen[i]) {
+ try {
+ tupleAppender.reset(writeBuffers[i], false);
+ // ? by JF why didn't clear the buffer ?
+ tupleAppender.flush(writers[i], false);
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException();
+ }
+ hde.addSuppressed(th);
+ } finally {
+ try {
+ writers[i].close();
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException();
+ }
+ hde.addSuppressed(th);
+ }
+ }
+ }
+ }
+ if (hde != null) {
+ throw hde;
}
}
@Override
public void fail() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.fail();
+ HyracksDataException hde = null;
+ for (int i = 0; i < outputArity; i++) {
+ if (isOpen[i]) {
+ try {
+ writers[i].fail();
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException();
+ }
+ hde.addSuppressed(th);
+ }
+ }
+ }
+ if (hde != null) {
+ throw hde;
}
}
@@ -144,8 +178,9 @@
@Override
public void open() throws HyracksDataException {
- for (IFrameWriter writer : writers) {
- writer.open();
+ for (int i = 0; i < writers.length; i++) {
+ isOpen[i] = true;
+ writers[i].open();
}
// Create write buffers.
for (int i = 0; i < outputArity; i++) {
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
index 34bd1e0..8a5f38c 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/RunningAggregateRuntimeFactory.java
@@ -90,6 +90,7 @@
private final IRunningAggregateEvaluator[] raggs = new IRunningAggregateEvaluator[runningAggregates.length];
private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
private boolean first = true;
+ private boolean isOpen = false;
@Override
public void open() throws HyracksDataException {
@@ -112,10 +113,25 @@
throw new HyracksDataException(ae);
}
}
+ isOpen = true;
writer.open();
}
@Override
+ public void close() throws HyracksDataException {
+ if (isOpen) {
+ super.close();
+ }
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ if (isOpen) {
+ super.fail();
+ }
+ }
+
+ @Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
index 5605485..ef172c7 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamLimitRuntimeFactory.java
@@ -73,7 +73,7 @@
@Override
public void open() throws HyracksDataException {
- // if (first) {
+ writer.open();
if (evalMaxObjects == null) {
initAccessAppendRef(ctx);
try {
@@ -85,14 +85,12 @@
throw new HyracksDataException(ae);
}
}
- writer.open();
afterLastTuple = false;
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
if (afterLastTuple) {
- // ignore the data
return;
}
tAccess.reset(buffer);
@@ -123,7 +121,6 @@
appendTupleToFrame(t);
}
} else {
- // close();
afterLastTuple = true;
break;
}
@@ -136,9 +133,7 @@
toSkip = 0; // how many tuples still to skip
firstTuple = true;
afterLastTuple = false;
- // if (!afterLastTuple) {
super.close();
- // }
}
private int evaluateInteger(IScalarEvaluator eval, int tIdx) throws HyracksDataException {
@@ -154,5 +149,4 @@
};
}
-
}
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 34754b4..2cea90d 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -56,11 +56,11 @@
@Override
public void open() throws HyracksDataException {
+ writer.open();
if (first) {
first = false;
initAccessAppend(ctx);
}
- writer.open();
}
@Override
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
index 416c398..75c3d08 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/StreamSelectRuntimeFactory.java
@@ -53,7 +53,8 @@
/**
* @param cond
- * @param projectionList if projectionList is null, then no projection is performed
+ * @param projectionList
+ * if projectionList is null, then no projection is performed
* @param retainNull
* @param nullPlaceholderVariableIndex
* @param nullWriterFactory
@@ -83,6 +84,7 @@
private IScalarEvaluator eval;
private INullWriter nullWriter = null;
private ArrayTupleBuilder nullTupleBuilder = null;
+ private boolean isOpen = false;
@Override
public void open() throws HyracksDataException {
@@ -94,6 +96,7 @@
throw new HyracksDataException(ae);
}
}
+ isOpen = true;
writer.open();
//prepare nullTupleBuilder
@@ -107,6 +110,24 @@
}
@Override
+ public void fail() throws HyracksDataException {
+ if (isOpen) {
+ super.fail();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (isOpen) {
+ try {
+ flushIfNotFailed();
+ } finally {
+ writer.close();
+ }
+ }
+ }
+
+ @Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
tAccess.reset(buffer);
int nTuple = tAccess.getTupleCount();
diff --git a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 8850436..6b21cda 100644
--- a/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -36,7 +36,6 @@
import org.apache.hyracks.data.std.primitive.IntegerPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
public class UnnestRuntimeFactory extends AbstractOneInputOneOutputRuntimeFactory {
@@ -96,6 +95,7 @@
@Override
public void open() throws HyracksDataException {
+ writer.open();
initAccessAppendRef(ctx);
try {
agg = unnestingFactory.createUnnestingEvaluator(ctx);
@@ -103,7 +103,6 @@
throw new HyracksDataException(ae);
}
tupleBuilder = new ArrayTupleBuilder(projectionList.length);
- writer.open();
}
@Override
@@ -112,16 +111,12 @@
int nTuple = tAccess.getTupleCount();
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
-
try {
offsetEval.evaluate(tRef, p);
} catch (AlgebricksException e) {
throw new HyracksDataException(e);
}
-
- @SuppressWarnings("static-access")
int offset = IntegerPointable.getInteger(p.getByteArray(), p.getStartOffset());
-
try {
agg.init(tRef);
// assume that when unnesting the tuple, each step() call for each element
diff --git a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
index 260caa1..f6c3ad0b 100644
--- a/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
@@ -33,22 +33,18 @@
* A producer follows the following protocol when using an {@link IFrameWriter}.
* Initially, the {@link IFrameWriter} is in the INITIAL state.
* The first valid call to an {@link IFrameWriter} is always the {@link IFrameWriter#open()}. This call provides the opportunity for the {@link IFrameWriter} implementation to allocate any resources for its
- * processing. Once this call returns, the {@link IFrameWriter} is in the OPENED
- * state. If an error occurs
- * during the {@link IFrameWriter#open()} call, a {@link HyracksDataException} is thrown and it stays in the INITIAL state.
- * While the {@link IFrameWriter} is in the OPENED state, the producer can call
- * one of:
+ * processing. Once open() is called, no matter successfully or not, the {@link IFrameWriter} is in the OPENED
+ * state.
+ * While the {@link IFrameWriter} is in the OPENED state, the producer can call one of:
* <ul>
- * <li> {@link IFrameWriter#close()} to give up any resources owned by the {@link IFrameWriter} and enter the CLOSED state.</li>
- * <li> {@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the {@link IFrameWriter}. The call returns normally on success and the {@link IFrameWriter} remains in the OPENED state. On failure, the call throws a {@link HyracksDataException}, and the {@link IFrameWriter} enters the ERROR state.</li>
- * <li> {@link IFrameWriter#fail()} to indicate that stream is to be aborted. The {@link IFrameWriter} enters the FAILED state.</li>
+ * <li>{@link IFrameWriter#close()} to give up any resources owned by the {@link IFrameWriter} and enter the CLOSED state.</li>
+ * <li>{@link IFrameWriter#nextFrame(ByteBuffer)} to provide data to the {@link IFrameWriter}. The call returns normally on success and the {@link IFrameWriter} remains in the OPENED state. On failure, the call throws a {@link HyracksDataException}, the {@link IFrameWriter} remains in the OPENED state.</li>
+ * <li>{@link IFrameWriter#fail()} to indicate that stream is to be aborted. The {@link IFrameWriter} enters the FAILED state.</li>
* </ul>
* In the FAILED state, the only call allowed is the {@link IFrameWriter#close()} to move the {@link IFrameWriter} into the CLOSED
* state and give up all resources.
* No calls are allowed when the {@link IFrameWriter} is in the CLOSED state.
- * Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} is not called by the producer. So an exceptional
- * return from the {@link IFrameWriter#open()} call must clean up all partially
- * allocated resources.
+ * Note: If the call to {@link IFrameWriter#open()} failed, the {@link IFrameWriter#close()} must still be called by the producer.
*
* @author vinayakb
*/
@@ -61,7 +57,8 @@
/**
* Provide data to the stream of this {@link IFrameWriter}.
*
- * @param buffer - Buffer containing data.
+ * @param buffer
+ * - Buffer containing data.
* @throws HyracksDataException
*/
public void nextFrame(ByteBuffer buffer) throws HyracksDataException;
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
index 09dd03d..a46fa7b 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/profiling/ConnectorSenderProfilingFrameWriter.java
@@ -32,16 +32,16 @@
private final ICounter closeCounter;
private final ICounter frameCounter;
- public ConnectorSenderProfilingFrameWriter(IHyracksTaskContext ctx, IFrameWriter writer,
- ConnectorDescriptorId cdId, int senderIndex, int receiverIndex) {
+ public ConnectorSenderProfilingFrameWriter(IHyracksTaskContext ctx, IFrameWriter writer, ConnectorDescriptorId cdId,
+ int senderIndex, int receiverIndex) {
this.writer = writer;
int attempt = ctx.getTaskAttemptId().getAttempt();
- this.openCounter = ctx.getCounterContext().getCounter(
- cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".open", true);
- this.closeCounter = ctx.getCounterContext().getCounter(
- cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".close", true);
- this.frameCounter = ctx.getCounterContext().getCounter(
- cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".nextFrame", true);
+ this.openCounter = ctx.getCounterContext()
+ .getCounter(cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".open", true);
+ this.closeCounter = ctx.getCounterContext()
+ .getCounter(cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".close", true);
+ this.frameCounter = ctx.getCounterContext()
+ .getCounter(cdId + ".sender." + attempt + "." + senderIndex + "." + receiverIndex + ".nextFrame", true);
}
@Override
@@ -58,8 +58,11 @@
@Override
public void close() throws HyracksDataException {
- closeCounter.update(1);
- writer.close();
+ try {
+ closeCounter.update(1);
+ } finally {
+ writer.close();
+ }
}
@Override
diff --git a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
index 391a636..1ebebda 100644
--- a/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java
@@ -32,7 +32,6 @@
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -70,10 +69,11 @@
recordDescriptors[0] = helper.getMapOutputRecordDescriptor();
}
+ @SuppressWarnings("deprecation")
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
final HadoopHelper helper = new HadoopHelper(config);
final Configuration conf = helper.getConfiguration();
final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
@@ -219,18 +219,19 @@
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(),
- runGen.getRuns(), new int[] { 0 }, comparators, null,
- helper.getMapOutputRecordDescriptorWithoutExtraFields(), framesLimit, delegatingWriter);
+ ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(), runGen.getRuns(),
+ new int[] { 0 }, comparators, null, helper.getMapOutputRecordDescriptorWithoutExtraFields(),
+ framesLimit, delegatingWriter);
merger.process();
}
}
return new AbstractUnaryOutputSourceOperatorNodePushable() {
+ @SuppressWarnings("unchecked")
@Override
public void initialize() throws HyracksDataException {
- writer.open();
try {
+ writer.open();
SortingRecordWriter recordWriter = new SortingRecordWriter();
InputSplit split = null;
int blockId = 0;
@@ -246,9 +247,8 @@
Thread.currentThread().setContextClassLoader(ctxCL);
}
recordWriter.initBlock(blockId);
- Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil()
- .createMapContext(conf, taId, recordReader,
- recordWriter, null, null, split);
+ Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil().createMapContext(conf, taId,
+ recordReader, recordWriter, null, null, split);
mapper.run(mCtx);
recordReader.close();
recordWriter.sortAndFlushBlock(writer);
@@ -259,6 +259,9 @@
throw new HyracksDataException(e);
}
}
+ } catch (Throwable th) {
+ writer.fail();
+ throw th;
} finally {
writer.close();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
index 3e437e0..ee8c656 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/LocalityAwarePartitionDataWriter.java
@@ -36,6 +36,7 @@
public class LocalityAwarePartitionDataWriter implements IFrameWriter {
private final IFrameWriter[] pWriters;
+ private final boolean[] isWriterOpen;
private final IFrameTupleAppender[] appenders;
private final FrameTupleAccessor tupleAccessor;
private final ITuplePartitionComputer tpc;
@@ -46,6 +47,7 @@
int[] consumerPartitions = localityMap.getConsumers(senderIndex, nConsumerPartitions);
pWriters = new IFrameWriter[consumerPartitions.length];
appenders = new IFrameTupleAppender[consumerPartitions.length];
+ isWriterOpen = new boolean[consumerPartitions.length];
for (int i = 0; i < consumerPartitions.length; ++i) {
try {
pWriters[i] = pwFactory.createFrameWriter(consumerPartitions[i]);
@@ -67,6 +69,7 @@
@Override
public void open() throws HyracksDataException {
for (int i = 0; i < pWriters.length; ++i) {
+ isWriterOpen[i] = true;
pWriters[i].open();
}
}
@@ -94,8 +97,22 @@
*/
@Override
public void fail() throws HyracksDataException {
+ HyracksDataException failException = null;
for (int i = 0; i < appenders.length; ++i) {
- pWriters[i].fail();
+ if (isWriterOpen[i]) {
+ try {
+ pWriters[i].fail();
+ } catch (Throwable th) {
+ if (failException == null) {
+ failException = new HyracksDataException(th);
+ } else {
+ failException.addSuppressed(th);
+ }
+ }
+ }
+ }
+ if (failException != null) {
+ throw failException;
}
}
@@ -106,10 +123,32 @@
*/
@Override
public void close() throws HyracksDataException {
+ HyracksDataException closeException = null;
for (int i = 0; i < pWriters.length; ++i) {
- appenders[i].flush(pWriters[i], true);
- pWriters[i].close();
+ if (isWriterOpen[i]) {
+ try {
+ appenders[i].flush(pWriters[i], true);
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = new HyracksDataException(th);
+ } else {
+ closeException.addSuppressed(th);
+ }
+ } finally {
+ try {
+ pWriters[i].close();
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = new HyracksDataException(th);
+ } else {
+ closeException.addSuppressed(th);
+ }
+ }
+ }
+ }
+ }
+ if (closeException != null) {
+ throw closeException;
}
}
-
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
index 1730b22..7a3a019 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNReplicatingConnectorDescriptor.java
@@ -43,8 +43,9 @@
@Override
public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
final IFrameWriter[] epWriters = new IFrameWriter[nConsumerPartitions];
+ final boolean[] isOpen = new boolean[nConsumerPartitions];
for (int i = 0; i < nConsumerPartitions; ++i) {
epWriters[i] = edwFactory.createFrameWriter(i);
}
@@ -62,21 +63,50 @@
@Override
public void fail() throws HyracksDataException {
+ HyracksDataException failException = null;
for (int i = 0; i < epWriters.length; ++i) {
- epWriters[i].fail();
+ if (isOpen[i]) {
+ try {
+ epWriters[i].fail();
+ } catch (Throwable th) {
+ if (failException == null) {
+ failException = new HyracksDataException(th);
+ } else {
+ failException.addSuppressed(th);
+ }
+ }
+ }
+ }
+ if (failException != null) {
+ throw failException;
}
}
@Override
public void close() throws HyracksDataException {
+ HyracksDataException closeException = null;
for (int i = 0; i < epWriters.length; ++i) {
- epWriters[i].close();
+ if (isOpen[i]) {
+ try {
+ epWriters[i].close();
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = new HyracksDataException(th);
+ } else {
+ closeException.addSuppressed(th);
+ }
+ }
+ }
+ }
+ if (closeException != null) {
+ throw closeException;
}
}
@Override
public void open() throws HyracksDataException {
for (int i = 0; i < epWriters.length; ++i) {
+ isOpen[i] = true;
epWriters[i].open();
}
}
@@ -84,8 +114,8 @@
}
@Override
- public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
- int index, int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
+ public IPartitionCollector createPartitionCollector(IHyracksTaskContext ctx, RecordDescriptor recordDesc, int index,
+ int nProducerPartitions, int nConsumerPartitions) throws HyracksDataException {
BitSet expectedPartitions = new BitSet(nProducerPartitions);
expectedPartitions.set(0, nProducerPartitions);
NonDeterministicChannelReader channelReader = new NonDeterministicChannelReader(nProducerPartitions,
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 08df2c5..336272c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -35,6 +35,7 @@
public class PartitionDataWriter implements IFrameWriter {
private final int consumerPartitionCount;
private final IFrameWriter[] pWriters;
+ private final boolean[] isOpen;
private final FrameTupleAppender[] appenders;
private final FrameTupleAccessor tupleAccessor;
private final ITuplePartitionComputer tpc;
@@ -45,6 +46,7 @@
RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) throws HyracksDataException {
this.consumerPartitionCount = consumerPartitionCount;
pWriters = new IFrameWriter[consumerPartitionCount];
+ isOpen = new boolean[consumerPartitionCount];
appenders = new FrameTupleAppender[consumerPartitionCount];
for (int i = 0; i < consumerPartitionCount; ++i) {
try {
@@ -61,17 +63,40 @@
@Override
public void close() throws HyracksDataException {
+ HyracksDataException closeException = null;
for (int i = 0; i < pWriters.length; ++i) {
- if (allocatedFrame) {
- appenders[i].flush(pWriters[i], true);
+ if (isOpen[i]) {
+ if (allocatedFrame) {
+ try {
+ appenders[i].flush(pWriters[i], true);
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = new HyracksDataException(th);
+ } else {
+ closeException.addSuppressed(th);
+ }
+ }
+ }
+ try {
+ pWriters[i].close();
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = new HyracksDataException(th);
+ } else {
+ closeException.addSuppressed(th);
+ }
+ }
}
- pWriters[i].close();
+ }
+ if (closeException != null) {
+ throw closeException;
}
}
@Override
public void open() throws HyracksDataException {
for (int i = 0; i < pWriters.length; ++i) {
+ isOpen[i] = true;
pWriters[i].open();
}
}
@@ -99,8 +124,22 @@
@Override
public void fail() throws HyracksDataException {
+ HyracksDataException failException = null;
for (int i = 0; i < appenders.length; ++i) {
- pWriters[i].fail();
+ if (isOpen[i]) {
+ try {
+ pWriters[i].fail();
+ } catch (Throwable th) {
+ if (failException == null) {
+ failException = new HyracksDataException(th);
+ } else {
+ failException.addSuppressed(th);
+ }
+ }
+ }
+ }
+ if (failException != null) {
+ throw failException;
}
}
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
index f94d985..0f3687e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileScanOperatorDescriptor.java
@@ -56,8 +56,8 @@
@Override
public void initialize() throws HyracksDataException {
File f = split.getLocalFile().getFile();
- writer.open();
try {
+ writer.open();
InputStream in;
try {
in = new FileInputStream(f);
@@ -66,9 +66,9 @@
throw new HyracksDataException(e);
}
tp.parse(in, writer);
- } catch (Exception e) {
+ } catch (Throwable th) {
writer.fail();
- throw new HyracksDataException(e);
+ throw new HyracksDataException(th);
} finally {
writer.close();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
index 814e537..779c631 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupMergeOperatorNodePushable.java
@@ -122,8 +122,8 @@
public void initialize() throws HyracksDataException {
aggState = (ExternalGroupState) ctx.getStateObject(stateId);
runs = aggState.getRuns();
- writer.open();
try {
+ writer.open();
if (runs.size() <= 0) {
ISpillableTable gTable = aggState.getSpillableTable();
if (gTable != null) {
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 7fa9fcc..1c08b53 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -29,8 +29,8 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppenderWrapper;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.group.AggregateState;
import org.apache.hyracks.dataflow.std.group.IAggregatorDescriptor;
@@ -65,8 +65,8 @@
RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
this.groupFields = groupFields;
this.comparators = comparators;
- this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields,
- groupFields, writer);
+ this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields,
+ writer);
this.aggregateState = aggregator.createAggregateStates();
copyFrame = new VSizeFrame(ctx);
inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
@@ -138,9 +138,9 @@
for (int j = 0; j < groupFields.length; j++) {
tupleBuilder.addField(lastTupleAccessor, lastTupleIndex, groupFields[j]);
}
- boolean hasOutput = outputPartial ? aggregator.outputPartialResult(tupleBuilder, lastTupleAccessor,
- lastTupleIndex, aggregateState) : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor,
- lastTupleIndex, aggregateState);
+ boolean hasOutput = outputPartial
+ ? aggregator.outputPartialResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState)
+ : aggregator.outputFinalResult(tupleBuilder, lastTupleAccessor, lastTupleIndex, aggregateState);
if (hasOutput) {
appenderWrapper.appendSkipEmptyField(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray(), 0,
@@ -172,13 +172,16 @@
@Override
public void close() throws HyracksDataException {
- if (!isFailed && !first) {
- assert(copyFrameAccessor.getTupleCount() > 0);
- writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
- appenderWrapper.flush();
+ try {
+ if (!isFailed && !first) {
+ assert (copyFrameAccessor.getTupleCount() > 0);
+ writeOutput(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1);
+ appenderWrapper.flush();
+ }
+ aggregator.close();
+ aggregateState.close();
+ } finally {
+ appenderWrapper.close();
}
- aggregator.close();
- aggregateState.close();
- appenderWrapper.close();
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
index 349cc5a..8ba7626 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/GraceHashJoinOperatorNodePushable.java
@@ -104,11 +104,8 @@
nullWriters1[i] = nullWriterFactories[i].createNullWriter();
}
}
-
- writer.open();// open for probe
-
try {
-
+ writer.open();// open for probe
IFrame buffer = new VSizeFrame(ctx);
// buffer
int tableSize = (int) (numPartitions * recordsPerFrame * factor);
@@ -148,9 +145,9 @@
probeReader.close();
joiner.closeJoin(writer);
}
- } catch (Exception e) {
+ } catch (Throwable th) {
writer.fail();
- throw new HyracksDataException(e);
+ throw new HyracksDataException(th);
} finally {
writer.close();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
index f72d528..7badc1e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinOperatorDescriptor.java
@@ -188,7 +188,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(joinAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -201,12 +201,12 @@
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
- final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
- .createPredicateEvaluator());
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+ : predEvaluatorFactory.createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
- .getJobId(), new TaskId(getActivityId(), partition));
+ private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
+ ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
private final FrameTupleAccessor accessorBuild = new FrameTupleAccessor(rd1);
private final ITuplePartitionComputer hpcBuild = new FieldHashPartitionComputerFactory(keys1,
hashFunctionFactories).createPartitioner();
@@ -302,8 +302,8 @@
if (memsize > inputsize0) {
state.nPartitions = 0;
} else {
- state.nPartitions = (int) (Math.ceil((inputsize0 * factor / nPartitions - memsize)
- / (memsize - 1)));
+ state.nPartitions = (int) (Math
+ .ceil((inputsize0 * factor / nPartitions - memsize) / (memsize - 1)));
}
if (state.nPartitions <= 0) {
// becomes in-memory HJ
@@ -352,8 +352,8 @@
private void write(int i, ByteBuffer head) throws HyracksDataException {
RunFileWriter writer = state.fWriters[i];
if (writer == null) {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- BuildAndPartitionActivityNode.class.getSimpleName());
+ FileReference file = ctx.getJobletContext()
+ .createManagedWorkspaceFile(BuildAndPartitionActivityNode.class.getSimpleName());
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
state.fWriters[i] = writer;
@@ -378,7 +378,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
@@ -391,8 +391,8 @@
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
- final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
- .createPredicateEvaluator());
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+ : predEvaluatorFactory.createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
private BuildAndPartitionTaskState state;
@@ -413,9 +413,9 @@
@Override
public void open() throws HyracksDataException {
- state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
- BUILD_AND_PARTITION_ACTIVITY_ID), partition));
writer.open();
+ state = (BuildAndPartitionTaskState) ctx.getStateObject(
+ new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
buildWriters = state.fWriters;
probeWriters = new RunFileWriter[state.nPartitions];
bufferForPartitions = new IFrame[state.nPartitions];
@@ -483,65 +483,69 @@
@Override
public void close() throws HyracksDataException {
- state.joiner.join(inBuffer.getBuffer(), writer);
- state.joiner.closeJoin(writer);
- ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
- .createPartitioner();
- ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
- .createPartitioner();
- if (state.memoryForHashtable != memsize - 2) {
- for (int i = 0; i < state.nPartitions; i++) {
- ByteBuffer buf = bufferForPartitions[i].getBuffer();
- accessorProbe.reset(buf);
- if (accessorProbe.getTupleCount() > 0) {
- write(i, buf);
+ try {
+ state.joiner.join(inBuffer.getBuffer(), writer);
+ state.joiner.closeJoin(writer);
+ ITuplePartitionComputer hpcRep0 = new RepartitionComputerFactory(state.nPartitions, hpcf0)
+ .createPartitioner();
+ ITuplePartitionComputer hpcRep1 = new RepartitionComputerFactory(state.nPartitions, hpcf1)
+ .createPartitioner();
+ if (state.memoryForHashtable != memsize - 2) {
+ for (int i = 0; i < state.nPartitions; i++) {
+ ByteBuffer buf = bufferForPartitions[i].getBuffer();
+ accessorProbe.reset(buf);
+ if (accessorProbe.getTupleCount() > 0) {
+ write(i, buf);
+ }
+ closeWriter(i);
}
- closeWriter(i);
- }
- inBuffer.reset();
- int tableSize = -1;
- if (state.memoryForHashtable == 0) {
- tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
- } else {
- tableSize = (int) (memsize * recordsPerFrame * factor);
- }
- ISerializableTable table = new SerializableHashTable(tableSize, ctx);
- for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
- RunFileWriter buildWriter = buildWriters[partitionid];
- RunFileWriter probeWriter = probeWriters[partitionid];
- if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
- continue;
+ inBuffer.reset();
+ int tableSize = -1;
+ if (state.memoryForHashtable == 0) {
+ tableSize = (int) (state.nPartitions * recordsPerFrame * factor);
+ } else {
+ tableSize = (int) (memsize * recordsPerFrame * factor);
}
- table.reset();
- InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0),
- hpcRep0, new FrameTupleAccessor(rd1), hpcRep1, new FrameTuplePairComparator(keys0,
- keys1, comparators), isLeftOuter, nullWriters1, table, predEvaluator);
+ ISerializableTable table = new SerializableHashTable(tableSize, ctx);
+ for (int partitionid = 0; partitionid < state.nPartitions; partitionid++) {
+ RunFileWriter buildWriter = buildWriters[partitionid];
+ RunFileWriter probeWriter = probeWriters[partitionid];
+ if ((buildWriter == null && !isLeftOuter) || probeWriter == null) {
+ continue;
+ }
+ table.reset();
+ InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, tableSize,
+ new FrameTupleAccessor(rd0), hpcRep0, new FrameTupleAccessor(rd1), hpcRep1,
+ new FrameTuplePairComparator(keys0, keys1, comparators), isLeftOuter,
+ nullWriters1, table, predEvaluator);
- if (buildWriter != null) {
- RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
- buildReader.open();
- while (buildReader.nextFrame(inBuffer)) {
- ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
- FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
- joiner.build(copyBuffer);
+ if (buildWriter != null) {
+ RunFileReader buildReader = buildWriter.createDeleteOnCloseReader();
+ buildReader.open();
+ while (buildReader.nextFrame(inBuffer)) {
+ ByteBuffer copyBuffer = ctx.allocateFrame(inBuffer.getFrameSize());
+ FrameUtils.copyAndFlip(inBuffer.getBuffer(), copyBuffer);
+ joiner.build(copyBuffer);
+ inBuffer.reset();
+ }
+ buildReader.close();
+ }
+
+ // probe
+ RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
+ probeReader.open();
+ while (probeReader.nextFrame(inBuffer)) {
+ joiner.join(inBuffer.getBuffer(), writer);
inBuffer.reset();
}
- buildReader.close();
+ probeReader.close();
+ joiner.closeJoin(writer);
}
-
- // probe
- RunFileReader probeReader = probeWriter.createDeleteOnCloseReader();
- probeReader.open();
- while (probeReader.nextFrame(inBuffer)) {
- joiner.join(inBuffer.getBuffer(), writer);
- inBuffer.reset();
- }
- probeReader.close();
- joiner.closeJoin(writer);
}
+ } finally {
+ writer.close();
}
- writer.close();
}
private void closeWriter(int i) throws HyracksDataException {
@@ -554,8 +558,8 @@
private void write(int i, ByteBuffer head) throws HyracksDataException {
RunFileWriter writer = probeWriters[i];
if (writer == null) {
- FileReference file = ctx.createManagedWorkspaceFile(PartitionAndJoinActivityNode.class
- .getSimpleName());
+ FileReference file = ctx
+ .createManagedWorkspaceFile(PartitionAndJoinActivityNode.class.getSimpleName());
writer = new RunFileWriter(file, ctx.getIOManager());
writer.open();
probeWriters[i] = writer;
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index dae441b..87ac9bd 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -82,8 +82,7 @@
public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories, IBinaryComparatorFactory[] comparatorFactories,
IPredicateEvaluatorFactory predEvalFactory, RecordDescriptor recordDescriptor, boolean isLeftOuter,
- INullWriterFactory[] nullWriterFactories1,
- int tableSize) {
+ INullWriterFactory[] nullWriterFactories1, int tableSize) {
super(spec, 2, 1);
this.keys0 = keys0;
this.keys1 = keys1;
@@ -174,9 +173,8 @@
nullWriters1[i] = nullWriterFactories1[i].createNullWriter();
}
}
- final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ?
- null :
- predEvaluatorFactory.createPredicateEvaluator());
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+ : predEvaluatorFactory.createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
private HashBuildTaskState state;
@@ -187,13 +185,12 @@
.createPartitioner();
ITuplePartitionComputer hpc1 = new FieldHashPartitionComputerFactory(keys1, hashFunctionFactories)
.createPartitioner();
- state = new HashBuildTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
- partition));
+ state = new HashBuildTaskState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition));
ISerializableTable table = new SerializableHashTable(tableSize, ctx);
- state.joiner = new InMemoryHashJoin(ctx, tableSize,
- new FrameTupleAccessor(rd0), hpc0, new FrameTupleAccessor(rd1), hpc1,
- new FrameTuplePairComparator(keys0, keys1,
- comparators), isLeftOuter, nullWriters1, table, predEvaluator);
+ state.joiner = new InMemoryHashJoin(ctx, tableSize, new FrameTupleAccessor(rd0), hpc0,
+ new FrameTupleAccessor(rd1), hpc1, new FrameTuplePairComparator(keys0, keys1, comparators),
+ isLeftOuter, nullWriters1, table, predEvaluator);
}
@Override
@@ -231,9 +228,9 @@
@Override
public void open() throws HyracksDataException {
- state = (HashBuildTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(), 0),
- partition));
writer.open();
+ state = (HashBuildTaskState) ctx
+ .getStateObject(new TaskId(new ActivityId(getOperatorId(), 0), partition));
}
@Override
@@ -243,8 +240,11 @@
@Override
public void close() throws HyracksDataException {
- state.joiner.closeJoin(writer);
- writer.close();
+ try {
+ state.joiner.closeJoin(writer);
+ } finally {
+ writer.close();
+ }
}
@Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 03570c7..a12450e 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -132,9 +132,8 @@
final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
- final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null) ?
- predEvaluatorFactory.createPredicateEvaluator() :
- null);
+ final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null)
+ ? predEvaluatorFactory.createPredicateEvaluator() : null);
final INullWriter[] nullWriters1 = isLeftOuter ? new INullWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
@@ -148,12 +147,11 @@
@Override
public void open() throws HyracksDataException {
- state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(), new TaskId(getActivityId(),
- partition));
+ state = new JoinCacheTaskState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition));
- state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0),
- new FrameTupleAccessor(rd1), comparator, memSize, predEvaluator, isLeftOuter,
- nullWriters1);
+ state.joiner = new NestedLoopJoin(ctx, new FrameTupleAccessor(rd0), new FrameTupleAccessor(rd1),
+ comparator, memSize, predEvaluator, isLeftOuter, nullWriters1);
}
@@ -194,9 +192,9 @@
@Override
public void open() throws HyracksDataException {
- state = (JoinCacheTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
- JOIN_CACHE_ACTIVITY_ID), partition));
writer.open();
+ state = (JoinCacheTaskState) ctx.getStateObject(
+ new TaskId(new ActivityId(getOperatorId(), JOIN_CACHE_ACTIVITY_ID), partition));
}
@Override
@@ -206,8 +204,11 @@
@Override
public void close() throws HyracksDataException {
- state.joiner.closeJoin(writer);
- writer.close();
+ try {
+ state.joiner.closeJoin(writer);
+ } finally {
+ writer.close();
+ }
}
@Override
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 0278f92..c0c467a 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -43,7 +43,6 @@
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
-import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFamily;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -52,7 +51,6 @@
import org.apache.hyracks.dataflow.common.comm.io.FrameTuplePairComparator;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFamily;
-import org.apache.hyracks.dataflow.common.data.partition.RepartitionComputerFamily;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
@@ -160,7 +158,7 @@
IBinaryComparatorFactory[] comparatorFactories, RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory0,
ITuplePairComparatorFactory tupPaircomparatorFactory1, IPredicateEvaluatorFactory predEvaluatorFactory)
- throws HyracksDataException {
+ throws HyracksDataException {
super(spec, 2, 1);
this.memsize = memsize;
@@ -207,8 +205,7 @@
if (memorySize > buildSize) {
return 1; //We will switch to in-Mem HJ eventually
}
- numberOfPartitions = (int) (Math.ceil((double) (buildSize * factor / nPartitions - memorySize)
- / (double) (memorySize - 1)));
+ numberOfPartitions = (int) (Math.ceil((buildSize * factor / nPartitions - memorySize) / (memorySize - 1)));
if (numberOfPartitions <= 0) {
numberOfPartitions = 1; //becomes in-memory hash join
}
@@ -273,12 +270,12 @@
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
- final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
- .createPredicateEvaluator());
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+ : predEvaluatorFactory.createPredicateEvaluator());
IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
- private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(ctx.getJobletContext()
- .getJobId(), new TaskId(getActivityId(), partition));
+ private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
+ ctx.getJobletContext().getJobId(), new TaskId(getActivityId(), partition));
ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
hashFunctionGeneratorFactories).createPartitioner(0);
@@ -351,15 +348,15 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
final ITuplePairComparator nljComparator0 = tuplePairComparatorFactory0.createTuplePairComparator(ctx);
final ITuplePairComparator nljComparator1 = tuplePairComparatorFactory1.createTuplePairComparator(ctx);
- final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null : predEvaluatorFactory
- .createPredicateEvaluator());
+ final IPredicateEvaluator predEvaluator = (predEvaluatorFactory == null ? null
+ : predEvaluatorFactory.createPredicateEvaluator());
for (int i = 0; i < comparatorFactories.length; i++) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -378,12 +375,11 @@
@Override
public void open() throws HyracksDataException {
- state = (BuildAndPartitionTaskState) ctx.getStateObject(new TaskId(new ActivityId(getOperatorId(),
- BUILD_AND_PARTITION_ACTIVITY_ID), partition));
-
writer.open();
- state.hybridHJ.initProbe();
+ state = (BuildAndPartitionTaskState) ctx.getStateObject(
+ new TaskId(new ActivityId(getOperatorId(), BUILD_AND_PARTITION_ACTIVITY_ID), partition));
+ state.hybridHJ.initProbe();
LOGGER.fine("OptimizedHybridHashJoin is starting the probe phase.");
}
@@ -399,45 +395,40 @@
@Override
public void close() throws HyracksDataException {
- state.hybridHJ.closeProbe(writer);
-
- BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
-
- rPartbuff.reset();
- for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus.nextSetBit(pid + 1)) {
-
- RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
- RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
-
- if (bReader == null || pReader
- == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
- continue;
+ try {
+ state.hybridHJ.closeProbe(writer);
+ BitSet partitionStatus = state.hybridHJ.getPartitionStatus();
+ rPartbuff.reset();
+ for (int pid = partitionStatus.nextSetBit(0); pid >= 0; pid = partitionStatus
+ .nextSetBit(pid + 1)) {
+ RunFileReader bReader = state.hybridHJ.getBuildRFReader(pid);
+ RunFileReader pReader = state.hybridHJ.getProbeRFReader(pid);
+ if (bReader == null || pReader == null) { //either of sides (or both) does not have any tuple, thus no need for joining (no potential match)
+ continue;
+ }
+ int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
+ int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
+ int beforeMax = (bSize > pSize) ? bSize : pSize;
+ joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
}
- int bSize = state.hybridHJ.getBuildPartitionSizeInTup(pid);
- int pSize = state.hybridHJ.getProbePartitionSizeInTup(pid);
- int beforeMax = (bSize > pSize) ? bSize : pSize;
- joinPartitionPair(state.hybridHJ, bReader, pReader, pid, beforeMax, 1, false);
+ } finally {
+ writer.close();
}
- writer.close();
LOGGER.fine("OptimizedHybridHashJoin closed its probe phase");
}
private void joinPartitionPair(OptimizedHybridHashJoin ohhj, RunFileReader buildSideReader,
RunFileReader probeSideReader, int pid, int beforeMax, int level, boolean wasReversed)
- throws HyracksDataException {
+ throws HyracksDataException {
ITuplePartitionComputer probeHpc = new FieldHashPartitionComputerFamily(probeKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
ITuplePartitionComputer buildHpc = new FieldHashPartitionComputerFamily(buildKeys,
hashFunctionGeneratorFactories).createPartitioner(level);
- long buildPartSize = wasReversed ?
- (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize()) :
- (ohhj
- .getBuildPartitionSize(pid) / ctx.getInitialFrameSize());
- long probePartSize = wasReversed ?
- (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize()) :
- (ohhj
- .getProbePartitionSize(pid) / ctx.getInitialFrameSize());
+ long buildPartSize = wasReversed ? (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize())
+ : (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize());
+ long probePartSize = wasReversed ? (ohhj.getBuildPartitionSize(pid) / ctx.getInitialFrameSize())
+ : (ohhj.getProbePartitionSize(pid) / ctx.getInitialFrameSize());
LOGGER.fine("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() + ") (pid "
+ pid + ") - (level " + level + ") - wasReversed " + wasReversed + " - BuildSize:\t"
@@ -448,12 +439,11 @@
if (!skipInMemoryHJ && (buildPartSize < state.memForJoin)
|| (probePartSize < state.memForJoin && !isLeftOuter)) {
int tabSize = -1;
- if (!forceRR && (isLeftOuter || (buildPartSize
- < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
+ if (!forceRR && (isLeftOuter || (buildPartSize < probePartSize))) { //Case 1.1 - InMemHJ (wout Role-Reversal)
LOGGER.fine("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level "
+ level + "]");
- tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid) : ohhj
- .getBuildPartitionSizeInTup(pid);
+ tabSize = wasReversed ? ohhj.getProbePartitionSizeInTup(pid)
+ : ohhj.getBuildPartitionSizeInTup(pid);
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
@@ -465,8 +455,8 @@
LOGGER.fine(
"\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ WITH RoleReversal - [Level "
+ level + "]");
- tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid) : ohhj
- .getProbePartitionSizeInTup(pid);
+ tabSize = wasReversed ? ohhj.getBuildPartitionSizeInTup(pid)
+ : ohhj.getProbePartitionSizeInTup(pid);
if (tabSize == 0) {
throw new HyracksDataException(
"Trying to join an empty partition. Invalid table size for inMemoryHashJoin.");
@@ -480,8 +470,7 @@
else {
LOGGER.fine("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]");
OptimizedHybridHashJoin rHHj;
- if (!forceRR && (isLeftOuter
- || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
+ if (!forceRR && (isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (wout Role-Reversal)
LOGGER.fine("\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ level + "]");
int n = getNumberOfPartitions(state.memForJoin, (int) buildPartSize, fudgeFactor,
@@ -513,13 +502,12 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitionStatus();
- if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
- * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ
LOGGER.fine(
"\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ level + "]");
- for (int rPid = rPStatus.nextSetBit(0);
- rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+ .nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -527,16 +515,15 @@
continue;
}
- joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1),
- false); //checked-confirmed
+ joinPartitionPair(rHHj, rbrfw, rprfw, rPid, afterMax, (level + 1), false); //checked-confirmed
}
} else { //Case 2.1.2 - Switch to NLJ
LOGGER.fine(
"\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level "
+ level + "]");
- for (int rPid = rPStatus.nextSetBit(0);
- rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+ .nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -585,12 +572,11 @@
: maxAfterProbeSize;
BitSet rPStatus = rHHj.getPartitionStatus();
- if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD
- * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
+ if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.2.1 - Keep applying HHJ
LOGGER.fine("\t\t>>>Case 2.2.1 - KEEP APPLYING RecursiveHHJ WITH RoleReversal - [Level "
+ level + "]");
- for (int rPid = rPStatus.nextSetBit(0);
- rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+ .nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -598,15 +584,14 @@
continue;
}
- joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1),
- true); //checked-confirmed
+ joinPartitionPair(rHHj, rprfw, rbrfw, rPid, afterMax, (level + 1), true); //checked-confirmed
}
} else { //Case 2.2.2 - Switch to NLJ
LOGGER.fine(
"\t\t>>>Case 2.2.2 - SWITCHED to NLJ RecursiveHHJ WITH RoleReversal - [Level "
+ level + "]");
- for (int rPid = rPStatus.nextSetBit(0);
- rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) {
+ for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus
+ .nextSetBit(rPid + 1)) {
RunFileReader rbrfw = rHHj.getBuildRFReader(rPid);
RunFileReader rprfw = rHHj.getProbeRFReader(rPid);
@@ -644,8 +629,7 @@
bReader.open();
rPartbuff.reset();
while (bReader.nextFrame(rPartbuff)) {
- ByteBuffer copyBuffer = ctx
- .allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
+ ByteBuffer copyBuffer = ctx.allocateFrame(rPartbuff.getFrameSize()); //We need to allocate a copyBuffer, because this buffer gets added to the buffers list in the InMemoryHashJoin
FrameUtils.copyAndFlip(rPartbuff.getBuffer(), copyBuffer);
joiner.build(copyBuffer);
rPartbuff.reset();
@@ -665,10 +649,9 @@
private void applyNestedLoopJoin(RecordDescriptor outerRd, RecordDescriptor innerRd, int memorySize,
RunFileReader outerReader, RunFileReader innerReader, ITuplePairComparator nljComparator,
boolean reverse) throws HyracksDataException {
- NestedLoopJoin nlj = new NestedLoopJoin(ctx,
- new FrameTupleAccessor(outerRd),
- new FrameTupleAccessor(innerRd), nljComparator, memorySize,
- predEvaluator, isLeftOuter, nullWriters1);
+ NestedLoopJoin nlj = new NestedLoopJoin(ctx, new FrameTupleAccessor(outerRd),
+ new FrameTupleAccessor(innerRd), nljComparator, memorySize, predEvaluator, isLeftOuter,
+ nullWriters1);
nlj.setIsReversed(reverse);
IFrame cacheBuff = new VSizeFrame(ctx);
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 2398372..0c647d7 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -49,9 +49,9 @@
writer.open();
try {
appender.flush(writer, false);
- } catch (Exception e) {
+ } catch (Throwable th) {
writer.fail();
- throw new HyracksDataException(e);
+ throw new HyracksDataException(th);
} finally {
writer.close();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
index b9c2fb1..f8a8a67 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/MaterializerTaskState.java
@@ -52,8 +52,8 @@
}
public void open(IHyracksTaskContext ctx) throws HyracksDataException {
- FileReference file = ctx.getJobletContext().createManagedWorkspaceFile(
- MaterializerTaskState.class.getSimpleName());
+ FileReference file = ctx.getJobletContext()
+ .createManagedWorkspaceFile(MaterializerTaskState.class.getSimpleName());
out = new RunFileWriter(file, ctx.getIOManager());
out.open();
}
@@ -68,16 +68,19 @@
public void writeOut(IFrameWriter writer, IFrame frame) throws HyracksDataException {
RunFileReader in = out.createDeleteOnCloseReader();
- writer.open();
try {
- in.open();
- while (in.nextFrame(frame)) {
- writer.nextFrame(frame.getBuffer());
+ writer.open();
+ try {
+ in.open();
+ while (in.nextFrame(frame)) {
+ writer.nextFrame(frame.getBuffer());
+ }
+ } finally {
+ in.close();
}
- in.close();
- } catch (Exception e) {
+ } catch (Throwable th) {
writer.fail();
- throw new HyracksDataException(e);
+ throw new HyracksDataException(th);
} finally {
writer.close();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
index 04b893a..feff13c 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
@@ -71,8 +71,8 @@
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
- SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(new ActivityId(odId,
- SPLITTER_MATERIALIZER_ACTIVITY_ID));
+ SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(
+ new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
builder.addActivity(this, sma);
builder.addSourceEdge(0, sma, 0);
int taskOutputIndex = 0;
@@ -88,8 +88,8 @@
int activityId = MATERIALIZE_READER_ACTIVITY_ID;
for (int i = 0; i < outputArity; i++) {
if (outputMaterializationFlags[i]) {
- MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(new ActivityId(odId,
- activityId));
+ MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
+ new ActivityId(odId, activityId));
builder.addActivity(this, mra);
builder.addTargetEdge(i, mra, 0);
builder.addBlockingEdge(sma, mra);
@@ -113,15 +113,17 @@
return new AbstractUnaryInputOperatorNodePushable() {
private MaterializerTaskState state;
private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
+ private final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs];
@Override
public void open() throws HyracksDataException {
if (requiresMaterialization) {
- state = new MaterializerTaskState(ctx.getJobletContext().getJobId(), new TaskId(
- getActivityId(), partition));
+ state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
+ new TaskId(getActivityId(), partition));
state.open(ctx);
}
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ isOpen[i] = true;
writers[i].open();
}
}
@@ -138,19 +140,50 @@
@Override
public void close() throws HyracksDataException {
- if (requiresMaterialization) {
- state.close();
- ctx.setStateObject(state);
+ HyracksDataException hde = null;
+ try {
+ if (requiresMaterialization) {
+ state.close();
+ ctx.setStateObject(state);
+ }
+ } finally {
+ for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+ if (isOpen[i]) {
+ try {
+ writers[i].close();
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException(th);
+ } else {
+ hde.addSuppressed(th);
+ }
+ }
+ }
+ }
}
- for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
- writers[i].close();
+ if (hde != null) {
+ throw hde;
}
}
@Override
public void fail() throws HyracksDataException {
+ HyracksDataException hde = null;
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
- writers[i].fail();
+ if (isOpen[i]) {
+ try {
+ writers[i].fail();
+ } catch (Throwable th) {
+ if (hde == null) {
+ hde = new HyracksDataException(th);
+ } else {
+ hde.addSuppressed(th);
+ }
+ }
+ }
+ }
+ if (hde != null) {
+ throw hde;
}
}
@@ -172,21 +205,21 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
- getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+ MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
+ new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
state.writeOut(writer, new VSizeFrame(ctx));
}
@Override
public void deinitialize() throws HyracksDataException {
numberOfActiveMaterializeReaders--;
- MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(new TaskId(new ActivityId(
- getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+ MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
+ new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
if (numberOfActiveMaterializeReaders == 0) {
state.deleteFile();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
index 8d54c39..74f1cb4 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/InMemorySortOperatorDescriptor.java
@@ -87,9 +87,6 @@
private static class SortTaskState extends AbstractStateObject {
private FrameSorterMergeSort frameSorter;
- public SortTaskState() {
- }
-
private SortTaskState(JobId jobId, TaskId taskId) {
super(jobId, taskId);
}
@@ -165,14 +162,14 @@
IOperatorNodePushable op = new AbstractUnaryOutputSourceOperatorNodePushable() {
@Override
public void initialize() throws HyracksDataException {
- writer.open();
try {
- SortTaskState state = (SortTaskState) ctx.getStateObject(new TaskId(new ActivityId(
- getOperatorId(), SORT_ACTIVITY_ID), partition));
+ writer.open();
+ SortTaskState state = (SortTaskState) ctx.getStateObject(
+ new TaskId(new ActivityId(getOperatorId(), SORT_ACTIVITY_ID), partition));
state.frameSorter.flush(writer);
- } catch (Exception e) {
+ } catch (Throwable th) {
writer.fail();
- throw new HyracksDataException(e);
+ throw new HyracksDataException(th);
} finally {
writer.close();
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
index 3457fe8..289b879 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/union/UnionAllOperatorDescriptor.java
@@ -34,7 +34,8 @@
import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
public class UnionAllOperatorDescriptor extends AbstractOperatorDescriptor {
- public UnionAllOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs, RecordDescriptor recordDescriptor) {
+ public UnionAllOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInputs,
+ RecordDescriptor recordDescriptor) {
super(spec, nInputs, 1);
recordDescriptors[0] = recordDescriptor;
}
@@ -61,7 +62,7 @@
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new UnionOperator(ctx, inRecordDesc);
}
@@ -69,9 +70,7 @@
private class UnionOperator extends AbstractUnaryOutputOperatorNodePushable {
private int nOpened;
-
private int nClosed;
-
private boolean failed;
public UnionOperator(IHyracksTaskContext ctx, RecordDescriptor inRecordDesc) {
@@ -106,7 +105,7 @@
@Override
public void fail() throws HyracksDataException {
synchronized (UnionOperator.this) {
- if (failed) {
+ if (!failed) {
writer.fail();
}
failed = true;
@@ -117,6 +116,7 @@
public void close() throws HyracksDataException {
synchronized (UnionOperator.this) {
if (++nClosed == inputArity) {
+ // a single close
writer.close();
}
}
diff --git a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
index b0b1d52..431a4f4 100644
--- a/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
+++ b/hyracks/hyracks-examples/btree-example/btreehelper/src/main/java/org/apache/hyracks/examples/btree/helper/DataGenOperatorDescriptor.java
@@ -79,8 +79,8 @@
@Override
public void initialize() throws HyracksDataException {
- writer.open();
try {
+ writer.open();
for (int i = 0; i < numRecords; i++) {
tb.reset();
for (int j = 0; j < recDesc.getFieldCount(); j++) {
@@ -90,14 +90,15 @@
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
appender.flush(writer, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
- throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
+ throw new HyracksDataException("Record size (" + tb.getSize()
+ + ") larger than frame size (" + appender.getBuffer().capacity() + ")");
}
}
}
appender.flush(writer, true);
- } catch (Exception e) {
+ } catch (Throwable th) {
writer.fail();
- throw new HyracksDataException(e);
+ throw new HyracksDataException(th);
} finally {
writer.close();
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
index ed7c08d..5d300f4 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/rewriting/SuperActivityRewritingTest.java
@@ -84,9 +84,11 @@
public void initialize() throws HyracksDataException {
try {
writer.open();
+ } catch (Throwable th) {
+ writer.fail();
+ throw new HyracksDataException(th);
+ } finally {
writer.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
}
}
};
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
index 0d06655..26292f8 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/dataflow/HDFSReadOperatorDescriptor.java
@@ -26,7 +26,6 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -44,7 +43,7 @@
* To use this operator, a user need to provide an IKeyValueParserFactory implementation which convert
* key-value pairs into tuples.
*/
-@SuppressWarnings({ "deprecation", "rawtypes" })
+@SuppressWarnings({ "rawtypes" })
public class HDFSReadOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 1L;
@@ -91,7 +90,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
final InputSplit[] inputSplits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -102,46 +101,50 @@
public void initialize() throws HyracksDataException {
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
+ writer.open();
Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
JobConf conf = confFactory.getConf();
conf.setClassLoader(ctx.getJobletContext().getClassLoader());
IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
- writer.open();
- parser.open(writer);
- InputFormat inputFormat = conf.getInputFormat();
- for (int i = 0; i < inputSplits.length; i++) {
- /**
- * read all the partitions scheduled to the current node
- */
- if (scheduledLocations[i].equals(nodeName)) {
+ try {
+ parser.open(writer);
+ InputFormat inputFormat = conf.getInputFormat();
+ for (int i = 0; i < inputSplits.length; i++) {
/**
- * pick an unread split to read
- * synchronize among simultaneous partitions in the same machine
+ * read all the partitions scheduled to the current node
*/
- synchronized (executed) {
- if (executed[i] == false) {
- executed[i] = true;
- } else {
- continue;
+ if (scheduledLocations[i].equals(nodeName)) {
+ /**
+ * pick an unread split to read
+ * synchronize among simultaneous partitions in the same machine
+ */
+ synchronized (executed) {
+ if (executed[i] == false) {
+ executed[i] = true;
+ } else {
+ continue;
+ }
+ }
+
+ /**
+ * read the split
+ */
+ RecordReader reader = inputFormat.getRecordReader(inputSplits[i], conf, Reporter.NULL);
+ Object key = reader.createKey();
+ Object value = reader.createValue();
+ while (reader.next(key, value) == true) {
+ parser.parse(key, value, writer, inputSplits[i].toString());
}
}
-
- /**
- * read the split
- */
- RecordReader reader = inputFormat.getRecordReader(inputSplits[i], conf, Reporter.NULL);
- Object key = reader.createKey();
- Object value = reader.createValue();
- while (reader.next(key, value) == true) {
- parser.parse(key, value, writer, inputSplits[i].toString());
- }
}
+ } finally {
+ parser.close(writer);
}
- parser.close(writer);
- writer.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
+ } catch (Throwable th) {
+ writer.fail();
+ throw new HyracksDataException(th);
} finally {
+ writer.close();
Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
index 28c6cfe..d69191d 100644
--- a/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
+++ b/hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs2/dataflow/HDFSReadOperatorDescriptor.java
@@ -30,7 +30,6 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
-
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -104,7 +103,7 @@
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
- throws HyracksDataException {
+ throws HyracksDataException {
final List<FileSplit> inputSplits = splitsFactory.getSplits();
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@@ -116,11 +115,11 @@
public void initialize() throws HyracksDataException {
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
+ writer.open();
Thread.currentThread().setContextClassLoader(ctx.getJobletContext().getClassLoader());
Job job = confFactory.getConf();
job.getConfiguration().setClassLoader(ctx.getJobletContext().getClassLoader());
IKeyValueParser parser = tupleParserFactory.createKeyValueParser(ctx);
- writer.open();
InputFormat inputFormat = ReflectionUtils.newInstance(job.getInputFormatClass(),
job.getConfiguration());
int size = inputSplits.size();
@@ -155,10 +154,11 @@
}
}
parser.close(writer);
- writer.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
+ } catch (Throwable th) {
+ writer.fail();
+ throw new HyracksDataException(th);
} finally {
+ writer.close();
Thread.currentThread().setContextClassLoader(ctxCL);
}
}
diff --git a/hyracks/hyracks-storage-am-btree/pom.xml b/hyracks/hyracks-storage-am-btree/pom.xml
index cf14c3b..fba9d2c 100644
--- a/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks/hyracks-storage-am-btree/pom.xml
@@ -16,57 +16,68 @@
! specific language governing permissions and limitations
! under the License.
!-->
-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-storage-am-btree</artifactId>
- <name>hyracks-storage-am-btree</name>
-
- <parent>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- </parent>
-
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- <comments>A business-friendly OSS license</comments>
- </license>
- </licenses>
-
-
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-common</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-storage-am-btree</artifactId>
+ <name>hyracks-storage-am-btree</name>
+ <parent>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ </parent>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
+ <dependencies>
<dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-common</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-dataflow-std</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- </dependencies>
-</project>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-common</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-common</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>2.0.2-beta</version>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java b/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
new file mode 100644
index 0000000..b5c14c1
--- /dev/null
+++ b/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.comm.IFrameTupleAppender;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
+import org.apache.hyracks.storage.am.common.api.IIndexAccessor;
+import org.apache.hyracks.storage.am.common.api.IIndexCursor;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITreeIndex;
+import org.apache.hyracks.storage.am.common.api.IndexException;
+import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ BTreeUtils.class, FrameTupleAccessor.class, ArrayTupleBuilder.class,
+ IndexSearchOperatorNodePushable.class, FrameUtils.class, FrameTupleAppender.class })
+public class FramewriterTest {
+
+ private CountAnswer openException = new CountAndThrowException("Exception in open()");
+ private CountAnswer nextFrameException = new CountAndThrowException("Exception in nextFrame()");
+ private CountAnswer failException = new CountAndThrowException("Exception in fail()");
+ private CountAnswer closeException = new CountAndThrowException("Exception in close()");
+ private CountAnswer openError = new CountAndThrowError("Exception in open()");
+ private CountAnswer nextFrameError = new CountAndThrowError("Exception in nextFrame()");
+ private CountAnswer failError = new CountAndThrowError("Exception in fail()");
+ private CountAnswer closeError = new CountAndThrowError("Exception in close()");
+ private CountAnswer openNormal = new CountAnswer();
+ private CountAnswer nextFrameNormal = new CountAnswer();
+ private CountAnswer failNormal = new CountAnswer();
+ private CountAnswer closeNormal = new CountAnswer();
+
+ private int successes = 0;
+ private int failures = 0;
+ private static final int BUFFER_SIZE = 32000;
+ private static final int RECORDS_PER_FRAME = 3;
+ public static ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(BUFFER_SIZE);
+ private static final int NUMBER_OF_APPENDERS = 2;
+ public int counter = 0;
+
+ public boolean validate(boolean finished) {
+ // get number of open calls
+ int openCount = openException.getCallCount() + openNormal.getCallCount() + openError.getCallCount();
+ int nextFrameCount = nextFrameException.getCallCount() + nextFrameNormal.getCallCount()
+ + nextFrameError.getCallCount();
+ int failCount = failException.getCallCount() + failNormal.getCallCount() + failError.getCallCount();
+ int closeCount = closeException.getCallCount() + closeNormal.getCallCount() + closeError.getCallCount();
+
+ if (failCount > 1 || closeCount > 1 || openCount > 1) {
+ failures++;
+ return false;
+ }
+ if (openCount == 0 && (nextFrameCount > 0 || failCount > 0 || closeCount > 0)) {
+ failures++;
+ return false;
+ }
+ if (finished) {
+ if (closeCount == 0 && (nextFrameCount > 0 || failCount > 0 || openCount > 0)) {
+ failures++;
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ public MultiComparator mockMultiComparator() {
+ MultiComparator mc = Mockito.mock(MultiComparator.class);
+ return mc;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ // Mock static methods
+ PowerMockito.mockStatic(BTreeUtils.class);
+ PowerMockito.when(BTreeUtils.getSearchMultiComparator(Matchers.any(), Matchers.any()))
+ .thenReturn(mockMultiComparator());
+ PowerMockito.mockStatic(FrameUtils.class);
+
+ // Custom implementation for FrameUtils that push to next frame immediately
+ PowerMockito.when(
+ FrameUtils.appendToWriter(Matchers.any(IFrameWriter.class), Matchers.any(IFrameTupleAppender.class),
+ Matchers.any(IFrameTupleAccessor.class), Matchers.anyInt(), Matchers.anyInt()))
+ .thenAnswer(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ IFrameWriter writer = (IFrameWriter) args[0];
+ writer.nextFrame(EMPTY_BUFFER);
+ return BUFFER_SIZE;
+ }
+ });
+
+ // create global mock for FrameTupleAccessor, ArrayTupleBuilder
+ FrameTupleAccessor frameAccessor = Mockito.mock(FrameTupleAccessor.class);
+ Mockito.when(frameAccessor.getTupleCount()).thenReturn(RECORDS_PER_FRAME);
+
+ // Global custom implementations for FrameTupleAppender
+ // since we have two appenders, then we need to test each test twice
+ FrameTupleAppender[] appenders = mockAppenders();
+
+ // Mock all instances of a class <Note that you need to prepare the class calling this constructor as well>
+ PowerMockito.whenNew(FrameTupleAccessor.class).withAnyArguments().thenReturn(frameAccessor);
+ PowerMockito.whenNew(FrameTupleAppender.class).withAnyArguments().thenAnswer(new Answer<FrameTupleAppender>() {
+ @Override
+ public FrameTupleAppender answer(InvocationOnMock invocation) throws Throwable {
+ counter++;
+ if (counter % 2 == 1) {
+ return appenders[0];
+ }
+ return appenders[1];
+ }
+ });
+ }
+
+ public static FrameTupleAppender[] mockAppenders() throws HyracksDataException {
+ FrameTupleAppender[] appenders = new FrameTupleAppender[2];
+ appenders[0] = Mockito.mock(FrameTupleAppender.class);
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ IFrameWriter writer = (IFrameWriter) args[0];
+ writer.nextFrame(EMPTY_BUFFER);
+ return null;
+ }
+ }).when(appenders[0]).flush(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
+
+ appenders[1] = Mockito.mock(FrameTupleAppender.class);
+ Mockito.doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ throw new HyracksDataException("couldn't flush frame");
+ }
+ }).when(appenders[1]).flush(Matchers.any(IFrameWriter.class), Matchers.anyBoolean());
+
+ return appenders;
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ private void resetAllCounters() {
+ openException.reset();
+ nextFrameException.reset();
+ failException.reset();
+ closeException.reset();
+ openNormal.reset();
+ nextFrameNormal.reset();
+ failNormal.reset();
+ closeNormal.reset();
+ openError.reset();
+ nextFrameError.reset();
+ failError.reset();
+ closeError.reset();
+ }
+
+ @Test
+ public void test() {
+ try {
+ testBTreeSearchOperatorNodePushable();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ }
+ System.out.println("Number of passed tests: " + successes);
+ System.out.println("Number of failed tests: " + failures);
+ Assert.assertEquals(failures, 0);
+ }
+
+ private void testBTreeSearchOperatorNodePushable() throws Exception {
+ /*
+ * coverage
+ * in open(){
+ * writer.open() succeeds vs. throws exception vs. throws error
+ * indexHelper.open() succeeds vs. throws exception
+ * createAccessor() succeeds vs. throws exception
+ * }
+ * in nextFrame(){
+ * indexAccessor.search succeeds vs. throws exception
+ * writeSearchResults succeeds vs. throws exception vs. throws error
+ * }
+ * in fail(){
+ * writer.fail() succeeds, throws exception, or throws error
+ * }
+ * in close(){
+ * appender.close() succeeds, throws exception, or throws error
+ * }
+ */
+ int i = 0;
+ counter = 0;
+ while (i < NUMBER_OF_APPENDERS) {
+ i++;
+ ByteBuffer buffer = mockByteBuffer();
+ IFrameWriter[] outPutFrameWriters = createOutputWriters();
+ for (IFrameWriter outputWriter : outPutFrameWriters) {
+ IFrameWriter[] underTest = createWriters();
+ for (IFrameWriter writer : underTest) {
+ ((AbstractUnaryOutputOperatorNodePushable) writer).setOutputFrameWriter(0, outputWriter,
+ mockRecordDescriptor());
+ testWriter(writer, buffer);
+ }
+ }
+ counter = i;
+ }
+ }
+
+ private ByteBuffer mockByteBuffer() {
+ return ByteBuffer.allocate(BUFFER_SIZE);
+ }
+
+ /**
+ * @return a list of writers to test. these writers can be of the same type but behave differently based on included mocks
+ * @throws HyracksDataException
+ * @throws IndexException
+ */
+ public IFrameWriter[] createWriters() throws HyracksDataException, IndexException {
+ ArrayList<BTreeSearchOperatorNodePushable> writers = new ArrayList<BTreeSearchOperatorNodePushable>();
+ AbstractTreeIndexOperatorDescriptor[] opDescs = mockIndexOpDesc();
+ IRecordDescriptorProvider[] recordDescProviders = mockRecDescProviders();
+ int partition = 0;
+ IHyracksTaskContext[] ctxs = mockIHyracksTaskContext();
+ int[] keys = { 0 };
+ boolean lowKeyInclusive = true;
+ boolean highKeyInclusive = true;
+ for (AbstractTreeIndexOperatorDescriptor opDesc : opDescs) {
+ for (IRecordDescriptorProvider recordDescProvider : recordDescProviders) {
+ for (IHyracksTaskContext ctx : ctxs) {
+ BTreeSearchOperatorNodePushable writer = new BTreeSearchOperatorNodePushable(opDesc, ctx, partition,
+ recordDescProvider, keys, keys, lowKeyInclusive, highKeyInclusive, keys, keys);
+ writers.add(writer);
+ }
+ }
+ }
+ // Create the framewriter using the mocks
+ return writers.toArray(new IFrameWriter[writers.size()]);
+ }
+
+ private IHyracksTaskContext[] mockIHyracksTaskContext() throws HyracksDataException {
+ IHyracksTaskContext ctx = Mockito.mock(IHyracksTaskContext.class);
+ Mockito.when(ctx.allocateFrame()).thenReturn(mockByteBuffer());
+ Mockito.when(ctx.allocateFrame(Mockito.anyInt())).thenReturn(mockByteBuffer());
+ Mockito.when(ctx.getInitialFrameSize()).thenReturn(BUFFER_SIZE);
+ Mockito.when(ctx.reallocateFrame(Mockito.any(), Mockito.anyInt(), Mockito.anyBoolean()))
+ .thenReturn(mockByteBuffer());
+ return new IHyracksTaskContext[] { ctx };
+ }
+
+ private IRecordDescriptorProvider[] mockRecDescProviders() {
+ RecordDescriptor rDesc = mockRecordDescriptor();
+ IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class);
+ Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
+ Mockito.when(rDescProvider.getOutputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc);
+ return new IRecordDescriptorProvider[] { rDescProvider };
+ }
+
+ @SuppressWarnings("rawtypes")
+ private RecordDescriptor mockRecordDescriptor() {
+ ISerializerDeserializer serde = Mockito.mock(ISerializerDeserializer.class);
+ RecordDescriptor rDesc = new RecordDescriptor(new ISerializerDeserializer[] { serde });
+ return rDesc;
+ }
+
+ public ITreeIndex[] mockIndexes() throws HyracksDataException, IndexException {
+ IIndexAccessor[] indexAccessors = mockIndexAccessors();
+ ITreeIndex[] indexes = new ITreeIndex[indexAccessors.length * 2];
+ int j = 0;
+ for (int i = 0; i < indexAccessors.length; i++) {
+ indexes[j] = Mockito.mock(ITreeIndex.class);
+ Mockito.when(indexes[j].createAccessor(Mockito.any(), Mockito.any())).thenReturn(indexAccessors[i]);
+ j++;
+ indexes[j] = Mockito.mock(ITreeIndex.class);
+ Mockito.when(indexes[j].createAccessor(Mockito.any(), Mockito.any()))
+ .thenThrow(new HyracksDataException("failed to create accessor"));
+ j++;
+ }
+ return indexes;
+ }
+
+ private IIndexAccessor[] mockIndexAccessors() throws HyracksDataException, IndexException {
+ IIndexCursor[] cursors = mockIndexCursors();
+ IIndexAccessor[] accessors = new IIndexAccessor[cursors.length * 2];
+ int j = 0;
+ for (int i = 0; i < cursors.length; i++) {
+ IIndexCursor cursor = cursors[i];
+ IIndexAccessor accessor = Mockito.mock(IIndexAccessor.class);
+ Mockito.when(accessor.createSearchCursor(Matchers.anyBoolean())).thenReturn(cursor);
+ accessors[j] = accessor;
+ j++;
+ accessor = Mockito.mock(IIndexAccessor.class);
+ Mockito.when(accessor.createSearchCursor(Matchers.anyBoolean())).thenReturn(cursor);
+ Mockito.doAnswer(new Answer<Object>() {
+ private int k = 0;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ k++;
+ if (k % 2 == 0) {
+ throw new HyracksDataException("Couldn't search index");
+ }
+ return null;
+ }
+ }).when(accessor).search(Matchers.any(), Matchers.any());
+ accessors[j] = accessor;
+ j++;
+ }
+
+ return accessors;
+ }
+
+ private IIndexCursor[] mockIndexCursors() throws HyracksDataException, IndexException {
+ ITupleReference[] tuples = mockTuples();
+ IIndexCursor[] cursors = new IIndexCursor[tuples.length * 2];
+ int j = 0;
+ for (int i = 0; i < tuples.length; i++) {
+ IIndexCursor cursor = Mockito.mock(IIndexCursor.class);
+ Mockito.when(cursor.hasNext()).thenReturn(true, true, false);
+ Mockito.when(cursor.getTuple()).thenReturn(tuples[i]);
+ cursors[j] = cursor;
+ j++;
+ cursor = Mockito.mock(IIndexCursor.class);
+ Mockito.when(cursor.hasNext()).thenReturn(true, true, false);
+ Mockito.when(cursor.getTuple()).thenReturn(tuples[i]);
+ Mockito.doThrow(new HyracksDataException("Failed to close cursor")).when(cursor).close();
+ cursors[j] = cursor;
+ j++;
+ }
+ return cursors;
+ }
+
+ private ITupleReference[] mockTuples() {
+ ITupleReference tuple = Mockito.mock(ITupleReference.class);
+ return new ITupleReference[] { tuple };
+ }
+
+ public IIndexDataflowHelper[] mockIndexHelpers() throws HyracksDataException, IndexException {
+ ITreeIndex[] indexes = mockIndexes();
+ IIndexDataflowHelper[] indexHelpers = new IIndexDataflowHelper[indexes.length * 2];
+ int j = 0;
+ for (int i = 0; i < indexes.length; i++) {
+ // normal
+ indexHelpers[j] = Mockito.mock(IIndexDataflowHelper.class);
+ Mockito.when(indexHelpers[j].getIndexInstance()).thenReturn(indexes[i]);
+
+ // throws exception when opened
+ j++;
+ indexHelpers[j] = Mockito.mock(IIndexDataflowHelper.class);
+ Mockito.doThrow(new HyracksDataException("couldn't open index")).when(indexHelpers[j]).open();
+ Mockito.when(indexHelpers[j].getIndexInstance()).thenReturn(null);
+
+ j++;
+ }
+ return indexHelpers;
+ }
+
+ public IIndexDataflowHelperFactory[] mockIndexHelperFactories() throws HyracksDataException, IndexException {
+ IIndexDataflowHelper[] helpers = mockIndexHelpers();
+ IIndexDataflowHelperFactory[] indexHelperFactories = new IIndexDataflowHelperFactory[helpers.length];
+ for (int i = 0; i < helpers.length; i++) {
+ indexHelperFactories[i] = Mockito.mock(IIndexDataflowHelperFactory.class);
+ Mockito.when(
+ indexHelperFactories[i].createIndexDataflowHelper(Mockito.any(), Mockito.any(), Mockito.anyInt()))
+ .thenReturn(helpers[i]);
+ }
+ return indexHelperFactories;
+ }
+
+ public AbstractTreeIndexOperatorDescriptor[] mockIndexOpDesc() throws HyracksDataException, IndexException {
+ IIndexDataflowHelperFactory[] indexDataflowHelperFactories = mockIndexHelperFactories();
+ ISearchOperationCallbackFactory[] searchOpCallbackFactories = mockSearchOpCallbackFactories();
+ AbstractTreeIndexOperatorDescriptor[] opDescs = new AbstractTreeIndexOperatorDescriptor[indexDataflowHelperFactories.length
+ * searchOpCallbackFactories.length];
+ int k = 0;
+ for (int i = 0; i < indexDataflowHelperFactories.length; i++) {
+ for (int j = 0; j < searchOpCallbackFactories.length; j++) {
+ AbstractTreeIndexOperatorDescriptor opDesc = Mockito.mock(AbstractTreeIndexOperatorDescriptor.class);
+ Mockito.when(opDesc.getIndexDataflowHelperFactory()).thenReturn(indexDataflowHelperFactories[i]);
+ Mockito.when(opDesc.getRetainInput()).thenReturn(false);
+ Mockito.when(opDesc.getRetainNull()).thenReturn(false);
+ Mockito.when(opDesc.getSearchOpCallbackFactory()).thenReturn(searchOpCallbackFactories[j]);
+ opDescs[k] = opDesc;
+ k++;
+ }
+ }
+ return opDescs;
+ }
+
+ private ISearchOperationCallbackFactory[] mockSearchOpCallbackFactories() throws HyracksDataException {
+ ISearchOperationCallback searchOpCallback = mockSearchOpCallback();
+ ISearchOperationCallbackFactory searchOpCallbackFactory = Mockito.mock(ISearchOperationCallbackFactory.class);
+ Mockito.when(searchOpCallbackFactory.createSearchOperationCallback(Mockito.anyLong(), Mockito.any()))
+ .thenReturn(searchOpCallback);
+ return new ISearchOperationCallbackFactory[] { searchOpCallbackFactory };
+ }
+
+ private ISearchOperationCallback mockSearchOpCallback() {
+ ISearchOperationCallback opCallback = Mockito.mock(ISearchOperationCallback.class);
+ return opCallback;
+ }
+
+ public class CountAnswer implements Answer<Object> {
+ protected int count = 0;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ count++;
+ return null;
+ }
+
+ public int getCallCount() {
+ return count;
+ }
+
+ public void reset() {
+ count = 0;
+ }
+ }
+
+ public class CountAndThrowException extends CountAnswer {
+ private String errorMessage;
+
+ public CountAndThrowException(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ count++;
+ throw new HyracksDataException(errorMessage);
+ }
+ }
+
+ public class CountAndThrowError extends CountAnswer {
+ private String errorMessage;
+
+ public CountAndThrowError(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ count++;
+ throw new UnknownError(errorMessage);
+ }
+ }
+
+ public IFrameWriter[] createOutputWriters() throws Exception {
+ CountAnswer[] opens = new CountAnswer[] { openNormal, openException, openError };
+ CountAnswer[] nextFrames = new CountAnswer[] { nextFrameNormal, nextFrameException, nextFrameError };
+ CountAnswer[] fails = new CountAnswer[] { failNormal, failException, failError };
+ CountAnswer[] closes = new CountAnswer[] { closeNormal, closeException, closeError };
+ List<IFrameWriter> outputWriters = new ArrayList<IFrameWriter>();
+ for (CountAnswer openAnswer : opens) {
+ for (CountAnswer nextFrameAnswer : nextFrames) {
+ for (CountAnswer failAnswer : fails) {
+ for (CountAnswer closeAnswer : closes) {
+ IFrameWriter writer = Mockito.mock(IFrameWriter.class);
+ Mockito.doAnswer(openAnswer).when(writer).open();
+ Mockito.doAnswer(nextFrameAnswer).when(writer).nextFrame(Mockito.any());
+ Mockito.doAnswer(failAnswer).when(writer).fail();
+ Mockito.doAnswer(closeAnswer).when(writer).close();
+ outputWriters.add(writer);
+ }
+ }
+ }
+ }
+ return outputWriters.toArray(new IFrameWriter[outputWriters.size()]);
+ }
+
+ public void testWriter(IFrameWriter writer, ByteBuffer buffer) throws Exception {
+ resetAllCounters();
+ boolean failed = !validate(false);
+ if (failed) {
+ return;
+ }
+ try {
+ writer.open();
+ failed = !validate(false);
+ if (failed) {
+ return;
+ }
+ for (int i = 0; i < 10; i++) {
+ writer.nextFrame(buffer);
+ failed = !validate(false);
+ if (failed) {
+ return;
+ }
+ }
+ } catch (Throwable th1) {
+ try {
+ failed = !validate(false);
+ if (failed) {
+ return;
+ }
+ writer.fail();
+ failed = !validate(false);
+ if (failed) {
+ return;
+ }
+ } catch (Throwable th2) {
+ failed = !validate(false);
+ if (failed) {
+ return;
+ }
+ }
+ } finally {
+ if (!failed) {
+ try {
+ failed = !validate(false);
+ if (failed) {
+ return;
+ }
+ writer.close();
+ failed = !validate(true);
+ if (failed) {
+ return;
+ }
+ } catch (Throwable th3) {
+ failed = !validate(true);
+ if (failed) {
+ return;
+ }
+ }
+ }
+ }
+ successes++;
+ }
+}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
index 81d7834..9ee7969 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
@@ -25,8 +25,14 @@
public interface IIndexDataflowHelper {
public void create() throws HyracksDataException;
+ /*
+ * If close throws an exception, it means that the index was not closed successfully.
+ */
public void close() throws HyracksDataException;
+ /*
+ * If open throws an exception, it means that the index was not opened successfully.
+ */
public void open() throws HyracksDataException;
public void destroy() throws HyracksDataException;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 82f9fd6..631b2f1 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -33,8 +33,7 @@
import org.apache.hyracks.storage.am.common.api.IndexException;
import org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
-public class IndexBulkLoadOperatorNodePushable extends
- AbstractUnaryInputUnaryOutputOperatorNodePushable {
+public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
protected final IIndexOperatorDescriptor opDesc;
protected final IHyracksTaskContext ctx;
protected final float fillFactor;
@@ -48,15 +47,12 @@
protected IRecordDescriptorProvider recDescProvider;
protected PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
- public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc,
- IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
- float fillFactor, boolean verifyInput, long numElementsHint,
- boolean checkIfEmptyIndex,
- IRecordDescriptorProvider recordDescProvider) {
+ public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
+ int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex, IRecordDescriptorProvider recordDescProvider) {
this.opDesc = opDesc;
this.ctx = ctx;
- this.indexHelper = opDesc.getIndexDataflowHelperFactory()
- .createIndexDataflowHelper(opDesc, ctx, partition);
+ this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
this.fillFactor = fillFactor;
this.verifyInput = verifyInput;
this.numElementsHint = numElementsHint;
@@ -68,19 +64,16 @@
@Override
public void open() throws HyracksDataException {
- RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(
- opDesc.getActivityId(), 0);
+ RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(recDesc);
indexHelper.open();
index = indexHelper.getIndexInstance();
try {
- bulkLoader = index.createBulkLoader(fillFactor, verifyInput,
- numElementsHint, checkIfEmptyIndex);
+ writer.open();
+ bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
} catch (Exception e) {
- indexHelper.close();
throw new HyracksDataException(e);
}
- writer.open();
}
@Override
@@ -105,15 +98,24 @@
public void close() throws HyracksDataException {
try {
bulkLoader.end();
- } catch (Exception e) {
- throw new HyracksDataException(e);
+ } catch (Throwable th) {
+ throw new HyracksDataException(th);
} finally {
- indexHelper.close();
+ if (index != null) {
+ // If index was opened!
+ try {
+ indexHelper.close();
+ } finally {
+ writer.close();
+ }
+ }
}
- writer.close();
}
@Override
public void fail() throws HyracksDataException {
+ if (index != null) {
+ writer.fail();
+ }
}
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 4434871..a2913f4 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -55,6 +55,7 @@
protected IIndexAccessor indexAccessor;
protected ITupleFilter tupleFilter;
protected IModificationOperationCallback modCallback;
+ protected IIndex index;
public IndexInsertUpdateDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op) {
@@ -71,12 +72,12 @@
RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(inputRecDesc);
writeBuffer = new VSizeFrame(ctx);
- writer.open();
indexHelper.open();
- IIndex index = indexHelper.getIndexInstance();
+ index = indexHelper.getIndexInstance();
try {
- modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(indexHelper.getResourceName(),
- indexHelper.getResourceID(), index, ctx);
+ writer.open();
+ modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
+ indexHelper.getResourceName(), indexHelper.getResourceID(), index, ctx);
indexAccessor = index.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
if (tupleFilterFactory != null) {
@@ -84,7 +85,6 @@
frameTuple = new FrameTupleReference();
}
} catch (Exception e) {
- indexHelper.close();
throw new HyracksDataException(e);
}
}
@@ -129,8 +129,8 @@
break;
}
default: {
- throw new HyracksDataException("Unsupported operation " + op
- + " in tree index InsertUpdateDelete operator");
+ throw new HyracksDataException(
+ "Unsupported operation " + op + " in tree index InsertUpdateDelete operator");
}
}
} catch (HyracksDataException e) {
@@ -147,15 +147,19 @@
@Override
public void close() throws HyracksDataException {
- try {
- writer.close();
- } finally {
- indexHelper.close();
+ if (index != null) {
+ try {
+ writer.close();
+ } finally {
+ indexHelper.close();
+ }
}
}
@Override
public void fail() throws HyracksDataException {
- writer.fail();
+ if (index != null) {
+ writer.fail();
+ }
}
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 3db94e8..568bde8 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -107,11 +107,10 @@
@Override
public void open() throws HyracksDataException {
- accessor = new FrameTupleAccessor(inputRecDesc);
writer.open();
indexHelper.open();
index = indexHelper.getIndexInstance();
-
+ accessor = new FrameTupleAccessor(inputRecDesc);
if (retainNull) {
int fieldCount = getFieldCount();
nullTupleBuild = new ArrayTupleBuilder(fieldCount);
@@ -141,7 +140,6 @@
frameTuple = new FrameTupleReference();
}
} catch (Exception e) {
- indexHelper.close();
throw new HyracksDataException(e);
}
}
@@ -164,13 +162,12 @@
dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
tb.addFieldEndOffset();
}
- FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0,
- tb.getSize());
+ FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
}
if (!matched && retainInput && retainNull) {
- FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex,
- nullTupleBuild.getFieldEndOffsets(), nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize());
+ FrameUtils.appendConcatToWriter(writer, appender, accessor, tupleIndex, nullTupleBuild.getFieldEndOffsets(),
+ nullTupleBuild.getByteArray(), 0, nullTupleBuild.getSize());
}
}
@@ -192,16 +189,46 @@
@Override
public void close() throws HyracksDataException {
- try {
- appender.flush(writer, true);
+ HyracksDataException closeException = null;
+ if (index != null) {
+ // if index == null, then the index open was not successful
+ try {
+ appender.flush(writer, true);
+ } catch (Throwable th) {
+ closeException = new HyracksDataException(th);
+ }
+
try {
cursor.close();
- } catch (Exception e) {
- throw new HyracksDataException(e);
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = new HyracksDataException(th);
+ } else {
+ closeException.addSuppressed(th);
+ }
}
+ try {
+ indexHelper.close();
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = new HyracksDataException(th);
+ } else {
+ closeException.addSuppressed(th);
+ }
+ }
+ }
+ try {
+ // will definitely be called regardless of exceptions
writer.close();
- } finally {
- indexHelper.close();
+ } catch (Throwable th) {
+ if (closeException == null) {
+ closeException = new HyracksDataException(th);
+ } else {
+ closeException.addSuppressed(th);
+ }
+ }
+ if (closeException != null) {
+ throw closeException;
}
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
index 8c73272..08775bb 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorNodePushable.java
@@ -56,12 +56,12 @@
try {
ITreeIndexFrame cursorFrame = treeIndex.getLeafFrameFactory().createFrame();
ITreeIndexCursor cursor = treeIndexHelper.createDiskOrderScanCursor(cursorFrame);
- ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory().createSearchOperationCallback(
- treeIndexHelper.getResourceID(), ctx);
- ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex.createAccessor(
- NoOpOperationCallback.INSTANCE, searchCallback);
- writer.open();
+ ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
+ .createSearchOperationCallback(treeIndexHelper.getResourceID(), ctx);
+ ITreeIndexAccessor indexAccessor = (ITreeIndexAccessor) treeIndex
+ .createAccessor(NoOpOperationCallback.INSTANCE, searchCallback);
try {
+ writer.open();
indexAccessor.diskOrderScan(cursor);
int fieldCount = treeIndex.getFieldCount();
FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
@@ -83,16 +83,21 @@
tb.getSize());
}
appender.flush(writer, true);
- } catch (Exception e) {
+ } catch (Throwable th) {
writer.fail();
- throw new HyracksDataException(e);
+ throw new HyracksDataException(th);
} finally {
- cursor.close();
- writer.close();
+ try {
+ cursor.close();
+ } catch (Exception cursorCloseException) {
+ throw new IllegalStateException(cursorCloseException);
+ } finally {
+ writer.close();
+ }
}
- } catch (Exception e) {
+ } catch (Throwable th) {
treeIndexHelper.close();
- throw new HyracksDataException(e);
+ throw new HyracksDataException(th);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index 4a9ea27..c91aff7 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -66,11 +66,11 @@
@Override
public void open() throws HyracksDataException {
+ writer.open();
accessor = new FrameTupleAccessor(inputRecDesc);
builder = new ArrayTupleBuilder(outputRecDesc.getFieldCount());
builderData = builder.getFieldData();
appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
- writer.open();
}
@Override
@@ -81,10 +81,9 @@
for (int i = 0; i < tupleCount; i++) {
short numTokens = 0;
- tokenizer.reset(
- accessor.getBuffer().array(),
- accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
+ tokenizer.reset(accessor.getBuffer().array(), accessor.getTupleStartOffset(i)
+ + accessor.getFieldSlotsLength() + accessor.getFieldStartOffset(i, docField),
+ accessor.getFieldLength(i, docField));
if (addNumTokensKey) {
// Get the total number of tokens.
@@ -154,8 +153,11 @@
@Override
public void close() throws HyracksDataException {
- appender.flush(writer, true);
- writer.close();
+ try {
+ appender.flush(writer, true);
+ } finally {
+ writer.close();
+ }
}
@Override