[ASTERIXDB-3603][FUN] Runtime changes for transform functions
- user model changes: no
- storage format changes: no
- interface changes: yes
Ext-ref: MB-63039
Change-Id: I0b23d88db632d90aedec409db816c66a5a688daf
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19744
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
Reviewed-by: Ali Alsuliman <ali.al.solaiman@gmail.com>
Tested-by: Peeyush Gupta <peeyush.gupta@couchbase.com>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 3bb871b..6d37447 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -488,6 +488,16 @@
return isFileStore ? String.valueOf(ExternalWriterProvider.getSeparator(adapter)) : "";
}
+ private LogicalVariable getUnnestVar(ILogicalOperator op) {
+ while (op.getOperatorTag() != LogicalOperatorTag.UNNEST && !op.getInputs().isEmpty()) {
+ op = op.getInputs().get(0).getValue();
+ }
+ if (op.getOperatorTag() == LogicalOperatorTag.UNNEST) {
+ return ((UnnestOperator) op).getVariable();
+ }
+ return null;
+ }
+
public ILogicalPlan translate(Query expr, String outputDatasetName, ICompiledDmlStatement stmt,
ILogicalOperator baseOp, IResultMetadata resultMetadata) throws AlgebricksException {
MutableObject<ILogicalOperator> base = new MutableObject<>(new EmptyTupleSourceOperator());
@@ -500,8 +510,11 @@
ILogicalOperator topOp = p.first;
List<LogicalVariable> liveVars = new ArrayList<>();
VariableUtilities.getLiveVariables(topOp, liveVars);
- LogicalVariable unnestVar = liveVars.get(0);
- LogicalVariable resVar = unnestVar;
+ LogicalVariable unnestVar = getUnnestVar(topOp);
+ if (unnestVar == null) {
+ unnestVar = liveVars.get(0);
+ }
+ LogicalVariable resVar = liveVars.get(0);
if (outputDatasetName == null) {
FileSplit outputFileSplit = metadataProvider.getOutputFile();
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
index 0ab3ace..f03a74d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.001.ddl.sqlpp
@@ -18,8 +18,6 @@
*/
use test;
--- Fail: More than one argument
-
CREATE TRANSFORM FUNCTION transformTest1(doc) {
SELECT count(*) as count FROM [doc] t UNNEST t.a
};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
index 6d55991..a04ea00 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.002.ddl.sqlpp
@@ -18,8 +18,6 @@
*/
use test;
--- Fail: Less than one argument
-
CREATE TRANSFORM FUNCTION transformTest2(doc) {
SELECT (SELECT * FROM [doc] t UNNEST t.a) as tt
};
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
index c38bd72..0770d98 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.003.ddl.sqlpp
@@ -18,8 +18,6 @@
*/
use test;
--- Fail: Using a collection in the definition
-
CREATE TRANSFORM FUNCTION transformTest3(doc) {
SELECT * FROM [doc] t UNNEST t.a LIMIT 1
};
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
index 10cfb4e..d2c2265 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/function/transform/positive/transform.004.ddl.sqlpp
@@ -18,8 +18,6 @@
*/
use test;
--- Fail: Using a view in the definition
-
CREATE TRANSFORM FUNCTION transformTest4(cust) {
SELECT VALUE c
FROM [cust] AS c
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
index 25d43b2..a60e53b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java
@@ -689,12 +689,6 @@
// in the cache.
return null;
}
- //TODO(DB): review this and other similar ones
- if (ctx.getDataverse(functionSignature.getDatabaseName(), functionSignature.getDataverseName()) != null) {
- // This transaction has dropped and subsequently created the same
- // dataverse.
- return null;
- }
function = cache.getFunction(functionSignature);
if (function != null) {
// Function is already in the cache, don't add it again.
@@ -1348,6 +1342,15 @@
INSTANCE = new NCMetadataManagerImpl(proxies, metadataNode);
}
+ @Override
+ public List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException {
+ try {
+ return metadataNode.getAllDatasets(ctx.getTxnId());
+ } catch (RemoteException e) {
+ throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e);
+ }
+ }
+
private static class CCMetadataManagerImpl extends MetadataManager {
private final MetadataProperties metadataProperties;
private final ICcApplicationContext appCtx;
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
index fdd36de..4ce8742 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java
@@ -919,4 +919,6 @@
String feedName) throws AlgebricksException;
long getMaxTxnId();
+
+ List<Dataset> getAllDatasets(MetadataTransactionContext ctx) throws AlgebricksException;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index ee4a2c9..bc6789c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -1017,4 +1017,6 @@
List<FeedConnection> getFeedConnections(TxnId txnId, String database, DataverseName dataverseName, String feedName)
throws AlgebricksException, RemoteException;
+
+ List<Dataset> getAllDatasets(TxnId txnId) throws AlgebricksException, RemoteException;
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f11a338..63eda21 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -2033,4 +2033,8 @@
off += Character.charCount(codePointChar);
}
}
+
+ public List<Dataset> getAllDatasets() throws AlgebricksException {
+ return MetadataManager.INSTANCE.getAllDatasets(mdTxnCtx);
+ }
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
index 1701e64..005bdf5 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/unnestingfunctions/std/ScanCollectionDescriptor.java
@@ -69,6 +69,18 @@
this.funID = funID;
}
+ public IScalarEvaluatorFactory getListEvalFactory() {
+ return listEvalFactory;
+ }
+
+ public SourceLocation getSourceLoc() {
+ return sourceLoc;
+ }
+
+ public FunctionIdentifier getFunID() {
+ return funID;
+ }
+
@Override
public IUnnestingEvaluator createUnnestingEvaluator(IEvaluatorContext ctx) throws HyracksDataException {
return new IUnnestingEvaluator() {
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 0c74260..9b643cd 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.dataflow.common.comm.io.AbstractFrameAppender;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -38,10 +39,12 @@
protected IFrame frame;
protected FrameTupleAccessor tAccess;
protected FrameTupleReference tRef;
+ protected boolean ignoreFailures = false;
protected final void initAccessAppend(IHyracksTaskContext ctx) throws HyracksDataException {
frame = new VSizeFrame(ctx);
appender = new FrameTupleAppender(frame);
+ ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures);
tAccess = new FrameTupleAccessor(inputRecordDesc);
}
@@ -59,8 +62,10 @@
try {
flushIfNotFailed();
} catch (Exception e) {
- closeException = e;
- fail(closeException);
+ if (!ignoreFailures) {
+ closeException = e;
+ fail(closeException);
+ }
} finally {
closeException = CleanupUtils.close(writer, closeException);
}
@@ -115,4 +120,15 @@
throws HyracksDataException {
FrameUtils.appendConcatToWriter(writer, getTupleAppender(), accessor0, tIndex0, accessor1, tIndex1);
}
+
+ public void setIgnoreFailures(boolean ignoreFailures) {
+ this.ignoreFailures = ignoreFailures;
+ if (appender != null) {
+ ((AbstractFrameAppender) appender).setIgnoreFailures(ignoreFailures);
+ }
+ }
+
+ public boolean isIgnoreFailures() {
+ return ignoreFailures;
+ }
}
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index 202c087..c2e8473 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -26,6 +26,7 @@
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.ProfiledPushRuntime;
+import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -46,15 +47,23 @@
private final int outputArity;
private final AlgebricksPipeline pipeline;
private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap;
+ private final boolean ignoreFailures;
public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor) {
+ this(pipeline, inputArity, outputArity, pipelineInputRecordDescriptor, pipelineOutputRecordDescriptor, false);
+ }
+
+ public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int outputArity,
+ RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor pipelineOutputRecordDescriptor,
+ boolean ignoreFailures) {
this.pipeline = pipeline;
this.pipelineInputRecordDescriptor = pipelineInputRecordDescriptor;
this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
this.inputArity = inputArity;
this.outputArity = outputArity;
this.runtimeMap = new HashMap<>();
+ this.ignoreFailures = ignoreFailures;
}
public IFrameWriter assemblePipeline(IFrameWriter writer, IHyracksTaskContext ctx) throws HyracksDataException {
@@ -99,6 +108,9 @@
} else {
newRuntimes[j].setOutputFrameWriter(0, start, recordDescriptors[i]);
}
+ if (newRuntimes[j] instanceof AbstractOneInputOneOutputOneFramePushRuntime) {
+ ((AbstractOneInputOneOutputOneFramePushRuntime) newRuntimes[j]).setIgnoreFailures(ignoreFailures);
+ }
}
runtimeMap.put(runtimeFactory, newRuntimes);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index 7feca3c..242c603 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -90,6 +90,22 @@
return pipelines;
}
+ public RecordDescriptor getInputRecordDesc() {
+ return inputRecordDesc;
+ }
+
+ public RecordDescriptor getOutputRecordDesc() {
+ return outputRecordDesc;
+ }
+
+ public IMissingWriterFactory[] getMissingWriterFactories() {
+ return missingWriterFactories;
+ }
+
+ public Map<IPushRuntimeFactory, IOperatorStats> getStats() {
+ return stats;
+ }
+
public void setStats(Map<IPushRuntimeFactory, IOperatorStats> stats) {
this.stats.putAll(stats);
}
@@ -97,20 +113,20 @@
@Override
public AbstractOneInputOneOutputPushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
- return new SubplanPushRuntime(ctx);
+ return new SubplanPushRuntime(ctx, false);
}
- private class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+ public class SubplanPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
- final IHyracksTaskContext ctx;
+ protected final IHyracksTaskContext ctx;
- final NestedTupleSourceRuntime[] startOfPipelines;
+ protected final NestedTupleSourceRuntime[] startOfPipelines;
boolean first;
boolean profile;
- SubplanPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ protected SubplanPushRuntime(IHyracksTaskContext ctx, boolean ignoreFailures) throws HyracksDataException {
this.ctx = ctx;
this.profile = ctx.getJobFlags().contains(JobFlag.PROFILE_RUNTIME);
this.first = true;
@@ -150,7 +166,8 @@
outputRecordDescriptor = pipelineLastRecordDescriptor;
}
- PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor);
+ PipelineAssembler pa =
+ new PipelineAssembler(pipeline, 1, 1, inputRecordDesc, outputRecordDescriptor, ignoreFailures);
IFrameWriter head = pa.assemblePipeline(outputWriter, ctx, stats);
startOfPipelines[i] = (NestedTupleSourceRuntime) head;
pipelineAssemblers[i] = pa;
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
index 0874f2b..9cfc070 100644
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/UnnestRuntimeFactory.java
@@ -52,8 +52,8 @@
private static final long serialVersionUID = 1L;
- private final int outCol;
- private final int positionalCol;
+ protected final int outCol;
+ protected final int positionalCol;
private final IUnnestingEvaluatorFactory unnestingFactory;
private final IUnnestingPositionWriterFactory positionWriterFactory;
private final boolean leftOuter;
@@ -76,6 +76,26 @@
this.missingWriterFactory = missingWriterFactory;
}
+ public int[] getProjectionList() {
+ return projectionList;
+ }
+
+ public int getOutCol() {
+ return outCol;
+ }
+
+ public int getPositionalCol() {
+ return positionalCol;
+ }
+
+ public IUnnestingEvaluatorFactory getUnnestingFactory() {
+ return unnestingFactory;
+ }
+
+ public IUnnestingPositionWriterFactory getPositionWriterFactory() {
+ return positionWriterFactory;
+ }
+
@Override
public String toString() {
return "unnest " + outCol + (positionalCol >= 0 ? " at " + positionalCol : "") + " <- " + unnestingFactory
@@ -85,92 +105,102 @@
@Override
public AbstractOneInputOneOutputOneFramePushRuntime createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
- ByteArrayAccessibleOutputStream missingBytes = leftOuter ? writeMissingBytes() : null;
- IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
- private IPointable p = VoidPointable.FACTORY.createPointable();
- private ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(projectionList.length);
- private IUnnestingEvaluator unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
- private final IUnnestingPositionWriter positionWriter =
- positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null;
-
- @Override
- public void open() throws HyracksDataException {
- super.open();
- if (tRef == null) {
- initAccessAppendRef(ctx);
- }
- }
-
- @Override
- public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- try {
- unnest.init(tRef);
- unnesting(t);
- } catch (IOException ae) {
- throw HyracksDataException.create(ae);
- }
- }
- }
-
- private void unnesting(int t) throws IOException {
- // Assumes that when unnesting the tuple, each step() call for each element
- // in the tuple will increase the positionIndex, and the positionIndex will
- // be reset when a new tuple is to be processed.
- int positionIndex = 1;
- boolean emitted = false;
- do {
- if (!unnest.step(p)) {
- break;
- }
- writeOutput(t, positionIndex++, false);
- emitted = true;
- } while (true);
- if (leftOuter && !emitted) {
- writeOutput(t, -1, true);
- }
- }
-
- private void writeOutput(int t, int positionIndex, boolean missing)
- throws HyracksDataException, IOException {
- tupleBuilder.reset();
- for (int f = 0; f < projectionList.length; f++) {
- int col = projectionList[f];
- if (col == outCol) {
- if (missing) {
- tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
- } else {
- tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
- }
- } else if (col == positionalCol) {
- if (missing) {
- tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
- } else {
- positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
- tupleBuilder.addFieldEndOffset();
- }
- } else {
- tupleBuilder.addField(tAccess, t, projectionList[f]);
- }
- }
- appendToFrameFromTupleBuilder(tupleBuilder);
- }
-
- @Override
- public void flush() throws HyracksDataException {
- appender.flush(writer);
- }
- };
+ return new UnnestPushRuntime(ctx);
}
- private ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException {
+ public class UnnestPushRuntime extends AbstractOneInputOneOutputOneFramePushRuntime {
+ private IPointable p = VoidPointable.FACTORY.createPointable();
+ protected ArrayTupleBuilder tupleBuilder;
+ protected IUnnestingEvaluator unnest;
+ private final IUnnestingPositionWriter positionWriter;
+ private final IHyracksTaskContext ctx;
+ protected ByteArrayAccessibleOutputStream missingBytes;
+
+ public UnnestPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
+ this.ctx = ctx;
+ IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
+ unnest = unnestingFactory.createUnnestingEvaluator(evalCtx);
+ tupleBuilder = new ArrayTupleBuilder(projectionList.length);
+ positionWriter =
+ positionWriterFactory != null ? positionWriterFactory.createUnnestingPositionWriter() : null;
+ missingBytes = leftOuter ? writeMissingBytes() : null;
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ super.open();
+ if (tRef == null) {
+ initAccessAppendRef(ctx);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ try {
+ unnest.init(tRef);
+ unnesting(t);
+ } catch (IOException ae) {
+ throw HyracksDataException.create(ae);
+ }
+ }
+ }
+
+ protected void unnesting(int t) throws IOException {
+ // Assumes that when unnesting the tuple, each step() call for each element
+ // in the tuple will increase the positionIndex, and the positionIndex will
+ // be reset when a new tuple is to be processed.
+ int positionIndex = 1;
+ boolean emitted = false;
+ do {
+ if (!unnest.step(p)) {
+ break;
+ }
+ writeOutput(t, positionIndex++, false);
+ emitted = true;
+ } while (true);
+ if (leftOuter && !emitted) {
+ writeOutput(t, -1, true);
+ }
+ }
+
+ private void writeOutput(int t, int positionIndex, boolean missing) throws HyracksDataException, IOException {
+ tupleBuilder.reset();
+ for (int f = 0; f < projectionList.length; f++) {
+ int col = projectionList[f];
+ if (col == outCol) {
+ if (missing) {
+ tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+ } else {
+ tupleBuilder.addField(p.getByteArray(), p.getStartOffset(), p.getLength());
+ }
+ } else if (col == positionalCol) {
+ if (missing) {
+ tupleBuilder.addField(missingBytes.getByteArray(), 0, missingBytes.size());
+ } else {
+ positionWriter.write(tupleBuilder.getDataOutput(), positionIndex);
+ tupleBuilder.addFieldEndOffset();
+ }
+ } else {
+ tupleBuilder.addField(tAccess, t, projectionList[f]);
+ }
+ }
+ appendToFrameFromTupleBuilder(tupleBuilder);
+ }
+
+ @Override
+ public void flush() throws HyracksDataException {
+ appender.flush(writer);
+ }
+ };
+
+ protected ByteArrayAccessibleOutputStream writeMissingBytes() throws HyracksDataException {
ByteArrayAccessibleOutputStream baos = new ByteArrayAccessibleOutputStream();
IMissingWriter missingWriter = missingWriterFactory.createMissingWriter();
missingWriter.writeMissing(new DataOutputStream(baos));
return baos;
}
-}
+}
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index e1acaee..7338cc7 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -48,6 +48,7 @@
protected int tupleCount;
protected int tupleDataEndOffset;
+ protected boolean ignoreFailures = false;
@Override
public void reset(IFrame frame, boolean clear) throws HyracksDataException {
@@ -91,10 +92,21 @@
public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
failIfInterrupted();
getBuffer().clear();
- outWriter.nextFrame(getBuffer());
- if (clearFrame) {
- frame.reset();
- reset(getBuffer(), true);
+ if (!ignoreFailures) {
+ outWriter.nextFrame(getBuffer());
+ if (clearFrame) {
+ frame.reset();
+ reset(getBuffer(), true);
+ }
+ } else {
+ try {
+ outWriter.nextFrame(getBuffer());
+ } finally {
+ if (clearFrame) {
+ frame.reset();
+ reset(getBuffer(), true);
+ }
+ }
}
}
@@ -125,4 +137,12 @@
throw HyracksDataException.create(new InterruptedException());
}
}
+
+ public void setIgnoreFailures(boolean ignoreFailures) {
+ this.ignoreFailures = ignoreFailures;
+ }
+
+ public boolean isIgnoreFailures() {
+ return ignoreFailures;
+ }
}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index 3ef8b28..58194f4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -299,5 +299,4 @@
}
return false;
}
-
}