First running prototype of pipeline of insert/delete ops with is-not-null filters. All tests pass, but there is still some room for optimization.
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix-fix-issue-9@275 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
index 179f97c..8543edd 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/IntroduceSecondaryIndexInsertDeleteRule.java
@@ -222,7 +222,7 @@
filterExpressions.add(new MutableObject<ILogicalExpression>(notFuncExpr));
}
Mutable<ILogicalExpression> filterExpression = null;
- if (filterExpressions.size() > 0) {
+ if (filterExpressions.size() > 1) {
filterExpression = new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
FunctionUtils.getFunctionInfo(AsterixBuiltinFunctions.AND), filterExpressions));
} else {
diff --git a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
index 77aa9fa..af4eefc 100644
--- a/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
+++ b/asterix-hyracks-glue/src/main/java/edu/uci/ics/asterix/runtime/transaction/TreeIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -34,6 +34,7 @@
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
@@ -51,6 +52,7 @@
private final IRecordDescriptorProvider recordDescProvider;
private final IndexOp op;
private final PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
+ private FrameTupleReference frameTuple;
private ByteBuffer writeBuffer;
private IIndexAccessor indexAccessor;
private ILockManager lockManager;
@@ -73,10 +75,6 @@
this.op = op;
tuple.setFieldPermutation(fieldPermutation);
this.txnContext = txnContext;
- ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
- if (tupleFilterFactory != null) {
- tupleFilter = tupleFilterFactory.createTupleFilter();
- }
AsterixAppRuntimeContext runtimeContext = (AsterixAppRuntimeContext) ctx.getJobletContext()
.getApplicationContext().getApplicationObject();
transactionProvider = runtimeContext.getTransactionProvider();
@@ -109,6 +107,11 @@
treeIndexHelper.init(false);
ITreeIndex treeIndex = (ITreeIndex) treeIndexHelper.getIndex();
indexAccessor = treeIndex.createAccessor();
+ ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
+ if (tupleFilterFactory != null) {
+ tupleFilter = tupleFilterFactory.createTupleFilter();
+ frameTuple = new FrameTupleReference();
+ }
initializeTransactionSupport();
} catch (Exception e) {
// cleanup in case of failure
@@ -125,11 +128,14 @@
byte[] resourceId = DataUtil.intToByteArray(fileId);
int tupleCount = accessor.getTupleCount();
try {
- for (int i = 0; i < tupleCount; i++) {
- tuple.reset(accessor, i);
- if (tupleFilter != null && !tupleFilter.accept(tuple)) {
- continue;
+ for (int i = 0; i < tupleCount; i++) {
+ if (tupleFilter != null) {
+ frameTuple.reset(accessor, i);
+ if (!tupleFilter.accept(frameTuple)) {
+ continue;
+ }
}
+ tuple.reset(accessor, i);
switch (op) {
case INSERT: {
lockManager.lock(txnContext, resourceId,
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 9d3e387..a67d0c3 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -41,6 +41,7 @@
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
+import edu.uci.ics.asterix.runtime.base.AsterixTupleFilterFactory;
import edu.uci.ics.asterix.runtime.transaction.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -49,6 +50,8 @@
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ILogicalExpressionJobGen;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
@@ -824,11 +827,12 @@
// TODO: Use filterExpr.
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
- IOperatorSchema propagatedSchema,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv,
List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
+ RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
String datasetName = dataSourceIndex.getDataSource().getId()
.getDatasetName();
@@ -839,26 +843,29 @@
}
AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
compiledDatasetDecl, indexName);
-
- if (cid.getKind() == IndexKind.BTREE)
+ AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(
+ inputSchemas, typeEnv, filterExpr, context);
+ if (cid.getKind() == IndexKind.BTREE) {
return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
- primaryKeys, secondaryKeys, recordDesc, context, spec,
+ primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
IndexOp.INSERT);
- else
+ } else {
return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
- primaryKeys, secondaryKeys, recordDesc, context, spec,
+ primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
IndexOp.INSERT);
+ }
}
@Override
// TODO: Use filterExpr.
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex,
- IOperatorSchema propagatedSchema,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv,
List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr,
+ RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
String datasetName = dataSourceIndex.getDataSource().getId()
.getDatasetName();
@@ -869,23 +876,39 @@
}
AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(
compiledDatasetDecl, indexName);
- if (cid.getKind() == IndexKind.BTREE)
+ AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(
+ inputSchemas, typeEnv, filterExpr, context);
+ if (cid.getKind() == IndexKind.BTREE) {
return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema,
- primaryKeys, secondaryKeys, recordDesc, context, spec,
+ primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
IndexOp.DELETE);
- else
+ } else {
return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema,
- primaryKeys, secondaryKeys, recordDesc, context, spec,
+ primaryKeys, secondaryKeys, filterFactory, recordDesc, context, spec,
IndexOp.DELETE);
+ }
}
+ private AsterixTupleFilterFactory createTupleFilterFactory(
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
+ ILogicalExpression filterExpr, JobGenContext context)
+ throws AlgebricksException {
+ ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
+ IEvaluatorFactory filterEvalFactory = exprJobGen
+ .createEvaluatorFactory(filterExpr, typeEnv, inputSchemas,
+ context);
+ return new AsterixTupleFilterFactory(filterEvalFactory,
+ context.getBinaryBooleanInspector());
+ }
+
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(
String datasetName, String indexName,
IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec, IndexOp indexOp)
- throws AlgebricksException {
+ List<LogicalVariable> secondaryKeys,
+ AsterixTupleFilterFactory filterFactory,
+ RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
int numKeys = primaryKeys.size() + secondaryKeys.size();
// generate field permutations
int[] fieldPermutation = new int[numKeys];
@@ -956,7 +979,7 @@
appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories,
fieldPermutation, indexOp, new BTreeDataflowHelperFactory(),
- null, NoOpOperationCallbackProvider.INSTANCE, txnId);
+ filterFactory, NoOpOperationCallbackProvider.INSTANCE, txnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(
btreeBulkLoad, splitsAndConstraint.second);
}
@@ -965,9 +988,10 @@
String datasetName, String indexName,
IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec, IndexOp indexOp)
- throws AlgebricksException {
+ List<LogicalVariable> secondaryKeys,
+ AsterixTupleFilterFactory filterFactory,
+ RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
AqlCompiledDatasetDecl compiledDatasetDecl = metadata
.findDataset(datasetName);
String itemTypeName = compiledDatasetDecl.getItemTypeName();
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
index 7dd7ac1..8bb0b9f 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package edu.uci.ics.asterix.runtime.base;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java
new file mode 100644
index 0000000..ce5cefd
--- /dev/null
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009-2010 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.runtime.base;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
+import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
+
+public class AsterixTupleFilterFactory implements ITupleFilterFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final IBinaryBooleanInspector boolInspector;
+ private final IEvaluatorFactory evalFactory;
+
+ public AsterixTupleFilterFactory(IEvaluatorFactory evalFactory,
+ IBinaryBooleanInspector boolInspector) throws AlgebricksException {
+ this.evalFactory = evalFactory;
+ this.boolInspector = boolInspector;
+ }
+
+ @Override
+ public ITupleFilter createTupleFilter() throws Exception {
+ return new AsterixTupleFilter(evalFactory, boolInspector);
+ }
+
+}