Add Support for Upsert Operation
This change adds support for upsert operations. it includes
creating a primary and secondary upsert operators in addition
to adding a new function "before" to the index operation call
back to correctly perform locking for the upsert operation.
Change-Id: I2705f43b6e6d187ee29b9ba5a7946d422990022a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/476
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 5388092..7e9da44 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -29,9 +29,9 @@
EXTENSION_OPERATOR,
EXTERNAL_LOOKUP,
GROUP,
- INDEX_INSERT_DELETE,
+ INDEX_INSERT_DELETE_UPSERT,
INNERJOIN,
- INSERT_DELETE,
+ INSERT_DELETE_UPSERT,
LEFTOUTERJOIN,
LIMIT,
MATERIALIZE,
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 53857d2..823ebae 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -48,13 +48,13 @@
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
- throws AlgebricksException;
+ throws AlgebricksException;
public boolean scannerOperatorIsLeaf(IDataSource<S> dataSource);
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
- throws AlgebricksException;
+ throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
@@ -63,7 +63,7 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, List<LogicalVariable> keys, LogicalVariable payLoadVar,
List<LogicalVariable> additionalNonKeyFields, JobGenContext context, JobSpecification jobSpec)
- throws AlgebricksException;
+ throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -193,4 +193,16 @@
public IFunctionInfo lookupFunction(FunctionIdentifier fid);
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<S> dataSource,
+ IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+ LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, LogicalVariable prevPayload,
+ RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+ IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, ILogicalExpression filterExpr,
+ List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKeys,
+ RecordDescriptor inputDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException;
+
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
similarity index 72%
rename from algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
rename to algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
index e9e3b01..c06cc18 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteUpsertOperator.java
@@ -22,21 +22,19 @@
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-public class IndexInsertDeleteOperator extends AbstractLogicalOperator {
+public class IndexInsertDeleteUpsertOperator extends AbstractLogicalOperator {
private final IDataSourceIndex<?, ?> dataSourceIndex;
private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
@@ -48,8 +46,11 @@
private final Kind operation;
private final boolean bulkload;
private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
+ // used for upsert operations
+ private List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs;
+ private Mutable<ILogicalExpression> prevAdditionalFilteringExpression;
- public IndexInsertDeleteOperator(IDataSourceIndex<?, ?> dataSourceIndex,
+ public IndexInsertDeleteUpsertOperator(IDataSourceIndex<?, ?> dataSourceIndex,
List<Mutable<ILogicalExpression>> primaryKeyExprs, List<Mutable<ILogicalExpression>> secondaryKeyExprs,
Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload) {
this.dataSourceIndex = dataSourceIndex;
@@ -69,22 +70,44 @@
@Override
public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
boolean b = false;
+ // Primary
for (int i = 0; i < primaryKeyExprs.size(); i++) {
if (visitor.transform(primaryKeyExprs.get(i))) {
b = true;
}
}
+ // Secondary
for (int i = 0; i < secondaryKeyExprs.size(); i++) {
if (visitor.transform(secondaryKeyExprs.get(i))) {
b = true;
}
}
+ // Additional Filtering <For upsert>
+ if (additionalFilteringExpressions != null) {
+ for (int i = 0; i < additionalFilteringExpressions.size(); i++) {
+ if (visitor.transform(additionalFilteringExpressions.get(i))) {
+ b = true;
+ }
+ }
+ }
+ // Old secondary <For upsert>
+ if (prevSecondaryKeyExprs != null) {
+ for (int i = 0; i < prevSecondaryKeyExprs.size(); i++) {
+ if (visitor.transform(prevSecondaryKeyExprs.get(i))) {
+ b = true;
+ }
+ }
+ }
+ // Old Filtering <For upsert>
+ if (prevAdditionalFilteringExpression != null) {
+ visitor.transform(prevAdditionalFilteringExpression);
+ }
return b;
}
@Override
public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
- return visitor.visitIndexInsertDeleteOperator(this, arg);
+ return visitor.visitIndexInsertDeleteUpsertOperator(this, arg);
}
@Override
@@ -99,7 +122,7 @@
@Override
public LogicalOperatorTag getOperatorTag() {
- return LogicalOperatorTag.INDEX_INSERT_DELETE;
+ return LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT;
}
@Override
@@ -143,4 +166,19 @@
return additionalFilteringExpressions;
}
+ public List<Mutable<ILogicalExpression>> getPrevSecondaryKeyExprs() {
+ return prevSecondaryKeyExprs;
+ }
+
+ public void setPrevSecondaryKeyExprs(List<Mutable<ILogicalExpression>> prevSecondaryKeyExprs) {
+ this.prevSecondaryKeyExprs = prevSecondaryKeyExprs;
+ }
+
+ public Mutable<ILogicalExpression> getPrevAdditionalFilteringExpression() {
+ return prevAdditionalFilteringExpression;
+ }
+
+ public void setPrevAdditionalFilteringExpression(Mutable<ILogicalExpression> prevAdditionalFilteringExpression) {
+ this.prevAdditionalFilteringExpression = prevAdditionalFilteringExpression;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
deleted file mode 100644
index b95d279..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-
-public class InsertDeleteOperator extends AbstractLogicalOperator {
-
- public enum Kind {
- INSERT,
- DELETE
- }
-
- private final IDataSource<?> dataSource;
-
- private final Mutable<ILogicalExpression> payloadExpr;
-
- private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
-
- private final Kind operation;
- private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
-
- private final boolean bulkload;
-
- public InsertDeleteOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
- List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
- this.dataSource = dataSource;
- this.payloadExpr = payloadExpr;
- this.primaryKeyExprs = primaryKeyExprs;
- this.operation = operation;
- this.bulkload = bulkload;
- }
-
- @Override
- public void recomputeSchema() throws AlgebricksException {
- schema = new ArrayList<LogicalVariable>();
- schema.addAll(inputs.get(0).getValue().getSchema());
- }
-
- @Override
- public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
- boolean changed = false;
- changed = transform.transform(payloadExpr);
- for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
- changed |= transform.transform(e);
- }
- return changed;
- }
-
- @Override
- public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
- return visitor.visitInsertDeleteOperator(this, arg);
- }
-
- @Override
- public boolean isMap() {
- return false;
- }
-
- @Override
- public VariablePropagationPolicy getVariablePropagationPolicy() {
- return VariablePropagationPolicy.ALL;
- }
-
- @Override
- public LogicalOperatorTag getOperatorTag() {
- return LogicalOperatorTag.INSERT_DELETE;
- }
-
- @Override
- public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
- return createPropagatingAllInputsTypeEnvironment(ctx);
- }
-
- public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
- return primaryKeyExprs;
- }
-
- public IDataSource<?> getDataSource() {
- return dataSource;
- }
-
- public Mutable<ILogicalExpression> getPayloadExpression() {
- return payloadExpr;
- }
-
- public Kind getOperation() {
- return operation;
- }
-
- public boolean isBulkload() {
- return bulkload;
- }
-
- public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
- this.additionalFilteringExpressions = additionalFilteringExpressions;
- }
-
- public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
- return additionalFilteringExpressions;
- }
-
-}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
new file mode 100644
index 0000000..607db69
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteUpsertOperator.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class InsertDeleteUpsertOperator extends AbstractLogicalOperator {
+
+ public enum Kind {
+ INSERT,
+ DELETE,
+ UPSERT
+ }
+
+ private final IDataSource<?> dataSource;
+ private final Mutable<ILogicalExpression> payloadExpr;
+ private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+ private final Kind operation;
+ private final boolean bulkload;
+ private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
+ private LogicalVariable prevRecordVar;
+ private Object prevRecordType;
+ private LogicalVariable prevFilterVar;
+ private Object prevFilterType;
+
+ public InsertDeleteUpsertOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
+ List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
+ this.dataSource = dataSource;
+ this.payloadExpr = payloadExpr;
+ this.primaryKeyExprs = primaryKeyExprs;
+ this.operation = operation;
+ this.bulkload = bulkload;
+ }
+
+ @Override
+ public void recomputeSchema() throws AlgebricksException {
+ schema = new ArrayList<LogicalVariable>();
+ schema.addAll(inputs.get(0).getValue().getSchema());
+ if (operation == Kind.UPSERT) {
+ // The upsert case also produces the previous record
+ schema.add(prevRecordVar);
+ if (prevFilterVar != null) {
+ schema.add(prevFilterVar);
+ }
+ }
+ }
+
+ public void getProducedVariables(Collection<LogicalVariable> producedVariables) {
+ if (prevRecordVar != null) {
+ producedVariables.add(prevRecordVar);
+ }
+ if (prevFilterVar != null) {
+ producedVariables.add(prevFilterVar);
+ }
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+ throws AlgebricksException {
+ boolean changed = false;
+ changed = transform.transform(payloadExpr);
+ for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
+ changed |= transform.transform(e);
+ }
+ if (additionalFilteringExpressions != null) {
+ for (Mutable<ILogicalExpression> e : additionalFilteringExpressions) {
+ changed |= transform.transform(e);
+ }
+ }
+ return changed;
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ return visitor.visitInsertDeleteUpsertOperator(this, arg);
+ }
+
+ @Override
+ public boolean isMap() {
+ return false;
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return new VariablePropagationPolicy() {
+ @Override
+ public void propagateVariables(IOperatorSchema target, IOperatorSchema... sources)
+ throws AlgebricksException {
+ target.addAllVariables(sources[0]);
+ if (operation == Kind.UPSERT) {
+ target.addVariable(prevRecordVar);
+ if (prevFilterVar != null) {
+ target.addVariable(prevFilterVar);
+ }
+ }
+ }
+ };
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.INSERT_DELETE_UPSERT;
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ PropagatingTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+ if (operation == Kind.UPSERT) {
+ env.setVarType(prevRecordVar, prevRecordType);
+ if (prevFilterVar != null) {
+ env.setVarType(prevFilterVar, prevFilterType);
+ }
+ }
+ return env;
+ }
+
+ public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
+ return primaryKeyExprs;
+ }
+
+ public IDataSource<?> getDataSource() {
+ return dataSource;
+ }
+
+ public Mutable<ILogicalExpression> getPayloadExpression() {
+ return payloadExpr;
+ }
+
+ public Kind getOperation() {
+ return operation;
+ }
+
+ public boolean isBulkload() {
+ return bulkload;
+ }
+
+ public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
+ this.additionalFilteringExpressions = additionalFilteringExpressions;
+ }
+
+ public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
+ return additionalFilteringExpressions;
+ }
+
+ public LogicalVariable getPrevRecordVar() {
+ return prevRecordVar;
+ }
+
+ public void setPrevRecordVar(LogicalVariable prevRecordVar) {
+ this.prevRecordVar = prevRecordVar;
+ }
+
+ public void setPrevRecordType(Object recordType) {
+ prevRecordType = recordType;
+ }
+
+ public LogicalVariable getPrevFilterVar() {
+ return prevFilterVar;
+ }
+
+ public void setPrevFilterVar(LogicalVariable prevFilterVar) {
+ this.prevFilterVar = prevFilterVar;
+ }
+
+ public Object getPrevFilterType() {
+ return prevFilterType;
+ }
+
+ public void setPrevFilterType(Object prevFilterType) {
+ this.prevFilterType = prevFilterType;
+ }
+}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 2ba4969..343ace8 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -22,7 +22,6 @@
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
@@ -62,7 +61,8 @@
}
@Override
- public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+ throws AlgebricksException {
return false;
}
@@ -73,7 +73,7 @@
@Override
public boolean isMap() {
- return false;
+ return true;
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
index e1a793c..c69ead7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
@@ -29,7 +29,7 @@
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 25993bb..b2c97c3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -56,9 +56,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -509,14 +509,14 @@
}
@Override
- public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext ctx)
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, IOptimizationContext ctx)
throws AlgebricksException {
propagateFDsAndEquivClasses(op, ctx);
return null;
}
@Override
- public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, IOptimizationContext ctx)
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, IOptimizationContext ctx)
throws AlgebricksException {
propagateFDsAndEquivClasses(op, ctx);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index dc535ea..7a4e7e1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -45,9 +45,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -454,11 +454,11 @@
}
@Override
- public Boolean visitInsertDeleteOperator(InsertDeleteOperator op, ILogicalOperator arg) throws AlgebricksException {
+ public Boolean visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
- if (aop.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE)
+ if (aop.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE_UPSERT)
return Boolean.FALSE;
- InsertDeleteOperator insertOpArg = (InsertDeleteOperator) copyAndSubstituteVar(op, arg);
+ InsertDeleteUpsertOperator insertOpArg = (InsertDeleteUpsertOperator) copyAndSubstituteVar(op, arg);
boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), insertOpArg.getSchema());
if (!op.getDataSource().equals(insertOpArg.getDataSource()))
isomorphic = false;
@@ -468,12 +468,12 @@
}
@Override
- public Boolean visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, ILogicalOperator arg)
+ public Boolean visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, ILogicalOperator arg)
throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
- if (aop.getOperatorTag() != LogicalOperatorTag.INDEX_INSERT_DELETE)
+ if (aop.getOperatorTag() != LogicalOperatorTag.INDEX_INSERT_DELETE_UPSERT)
return Boolean.FALSE;
- IndexInsertDeleteOperator insertOpArg = (IndexInsertDeleteOperator) copyAndSubstituteVar(op, arg);
+ IndexInsertDeleteUpsertOperator insertOpArg = (IndexInsertDeleteUpsertOperator) copyAndSubstituteVar(op, arg);
boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), insertOpArg.getSchema());
if (!op.getDataSourceIndex().equals(insertOpArg.getDataSourceIndex()))
isomorphic = false;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 376c2bc..c46ffde 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -46,9 +46,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -251,13 +251,13 @@
}
@Override
- public Void visitInsertDeleteOperator(InsertDeleteOperator op, ILogicalOperator arg) throws AlgebricksException {
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
}
@Override
- public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, ILogicalOperator arg)
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, ILogicalOperator arg)
throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8e378bc..4e5b13e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -37,9 +37,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -259,14 +259,14 @@
}
@Override
- public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext arg)
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, IOptimizationContext arg)
throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
@Override
- public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, IOptimizationContext arg)
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, IOptimizationContext arg)
throws AlgebricksException {
// TODO Auto-generated method stub
return null;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 5cf30c7..2e402fc 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -42,9 +42,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -266,12 +266,12 @@
}
@Override
- public ILogicalOperator visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+ public ILogicalOperator visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
List<Mutable<ILogicalExpression>> newKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
deepCopyExpressionRefs(newKeyExpressions, op.getPrimaryKeyExpressions());
List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
- InsertDeleteOperator insertDeleteOp = new InsertDeleteOperator(op.getDataSource(),
+ InsertDeleteUpsertOperator insertDeleteOp = new InsertDeleteUpsertOperator(op.getDataSource(),
deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(),
op.isBulkload());
insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
@@ -279,7 +279,7 @@
}
@Override
- public ILogicalOperator visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg)
+ public ILogicalOperator visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
throws AlgebricksException {
List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
@@ -289,7 +289,7 @@
((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
deepCopyExpressionRefs(newLSMComponentFilterExpressions, op.getAdditionalFilteringExpressions());
- IndexInsertDeleteOperator indexInsertDeleteOp = new IndexInsertDeleteOperator(op.getDataSourceIndex(),
+ IndexInsertDeleteUpsertOperator indexInsertDeleteOp = new IndexInsertDeleteUpsertOperator(op.getDataSourceIndex(),
newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression, op.getOperation(),
op.isBulkload());
indexInsertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index da5d2b2..8df772b 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -23,7 +23,6 @@
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -42,9 +41,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -241,12 +240,14 @@
}
@Override
- public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
+ op.getProducedVariables(producedVariables);
return null;
}
@Override
- public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) throws AlgebricksException {
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg)
+ throws AlgebricksException {
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index dc89c12..b488df1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -42,9 +42,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -283,13 +283,13 @@
}
@Override
- public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) throws AlgebricksException {
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
standardLayout(op);
return null;
}
@Override
- public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) throws AlgebricksException {
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg) throws AlgebricksException {
standardLayout(op);
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 4b791c1..91ff073 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -40,9 +40,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -388,7 +388,7 @@
}
@Override
- public Void visitInsertDeleteOperator(InsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
@@ -399,7 +399,7 @@
}
@Override
- public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op,
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op,
Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
e.getValue().substituteVar(pair.first, pair.second);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index e82b4f7..cfc57c2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -40,9 +40,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -344,11 +344,15 @@
}
@Override
- public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) {
+ public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Void arg) {
+ //1. The record variable
op.getPayloadExpression().getValue().getUsedVariables(usedVariables);
+
+ //2. The primary key variables
for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
e.getValue().getUsedVariables(usedVariables);
}
+ //3. The filters variables
if (op.getAdditionalFilteringExpressions() != null) {
for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
e.getValue().getUsedVariables(usedVariables);
@@ -358,7 +362,7 @@
}
@Override
- public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) {
+ public Void visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void arg) {
for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
e.getValue().getUsedVariables(usedVariables);
}
@@ -370,6 +374,14 @@
e.getValue().getUsedVariables(usedVariables);
}
}
+ if (op.getPrevAdditionalFilteringExpression() != null) {
+ op.getPrevAdditionalFilteringExpression().getValue().getUsedVariables(usedVariables);
+ }
+ if (op.getPrevSecondaryKeyExprs() != null) {
+ for (Mutable<ILogicalExpression> e : op.getPrevSecondaryKeyExprs()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ }
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 0352f83..28f783d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -34,18 +34,43 @@
public class VariableUtilities {
+ /***
+ * Adds the used variables in the logical operator to the list of used variables
+ *
+ * @param op
+ * The target operator
+ * @param usedVariables
+ * A list to be filled with variables used in the logical operator op.
+ * @throws AlgebricksException
+ */
public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)
throws AlgebricksException {
ILogicalOperatorVisitor<Void, Void> visitor = new UsedVariableVisitor(usedVariables);
op.accept(visitor, null);
}
+ /**
+ * Adds the variables produced in the logical operator in the list of produced variables
+ * @param op
+ * The target operator
+ * @param producedVariables
+ * The variables produced in the logical operator
+ * @throws AlgebricksException
+ */
public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)
throws AlgebricksException {
ILogicalOperatorVisitor<Void, Void> visitor = new ProducedVariableVisitor(producedVariables);
op.accept(visitor, null);
}
+ /**
+ * Adds the variables that are live after the execution of this operator to the list of schema variables.
+ * @param op
+ * The target logical operator
+ * @param schemaVariables
+ * The list of live variables. The output of the operator and the propagated outputs of its children
+ * @throws AlgebricksException
+ */
public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)
throws AlgebricksException {
ILogicalOperatorVisitor<Void, Void> visitor = new SchemaVariableVisitor(schemaVariables);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
index 449c04f..036ac05 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -35,8 +35,8 @@
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
@@ -92,7 +92,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
+ InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
assert insertDeleteOp.getOperation() == Kind.INSERT;
assert insertDeleteOp.isBulkload();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
index 15144ab..b837bfa 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -38,8 +38,8 @@
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
@@ -117,7 +117,7 @@
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- IndexInsertDeleteOperator indexInsertDeleteOp = (IndexInsertDeleteOperator) op;
+ IndexInsertDeleteUpsertOperator indexInsertDeleteOp = (IndexInsertDeleteUpsertOperator) op;
assert indexInsertDeleteOp.getOperation() == Kind.INSERT;
assert indexInsertDeleteOp.isBulkload();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
similarity index 76%
rename from algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
rename to algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
index 29673b4..4702361 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeleteUpsertPOperator.java
@@ -22,7 +22,6 @@
import java.util.List;
import org.apache.commons.lang3.mutable.Mutable;
-
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -37,29 +36,31 @@
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.JobSpecification;
-public class IndexInsertDeletePOperator extends AbstractPhysicalOperator {
+public class IndexInsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
private final List<LogicalVariable> primaryKeys;
private final List<LogicalVariable> secondaryKeys;
private final ILogicalExpression filterExpr;
private final IDataSourceIndex<?, ?> dataSourceIndex;
private final List<LogicalVariable> additionalFilteringKeys;
+ private final List<LogicalVariable> prevSecondaryKeys;
+ private final LogicalVariable prevAdditionalFilteringKey;
- public IndexInsertDeletePOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ public IndexInsertDeleteUpsertPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
- IDataSourceIndex<?, ?> dataSourceIndex) {
+ IDataSourceIndex<?, ?> dataSourceIndex, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKey) {
this.primaryKeys = primaryKeys;
this.secondaryKeys = secondaryKeys;
if (filterExpr != null) {
@@ -69,6 +70,8 @@
}
this.dataSourceIndex = dataSourceIndex;
this.additionalFilteringKeys = additionalFilteringKeys;
+ this.prevSecondaryKeys = prevSecondaryKeys;
+ this.prevAdditionalFilteringKey = prevAdditionalFilteringKey;
}
@Override
@@ -79,7 +82,7 @@
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
}
@Override
@@ -100,8 +103,8 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- IndexInsertDeleteOperator insertDeleteOp = (IndexInsertDeleteOperator) op;
+ throws AlgebricksException {
+ IndexInsertDeleteUpsertOperator insertDeleteUpsertOp = (IndexInsertDeleteUpsertOperator) op;
IMetadataProvider mp = context.getMetadataProvider();
JobSpecification spec = builder.getJobSpec();
@@ -109,18 +112,22 @@
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
- IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteOp);
- if (insertDeleteOp.getOperation() == Kind.INSERT) {
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteUpsertOp);
+ if (insertDeleteUpsertOp.getOperation() == Kind.INSERT) {
runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
- } else {
+ } else if (insertDeleteUpsertOp.getOperation() == Kind.DELETE) {
runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
+ } else if (insertDeleteUpsertOp.getOperation() == Kind.UPSERT) {
+ runtimeAndConstraints = mp.getIndexUpsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevSecondaryKeys,
+ prevAdditionalFilteringKey, inputDesc, context, spec);
}
- builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+ builder.contributeHyracksOperator(insertDeleteUpsertOp, runtimeAndConstraints.first);
builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
- ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+ ILogicalOperator src = insertDeleteUpsertOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, insertDeleteUpsertOp, 0);
}
@Override
@@ -132,4 +139,12 @@
public boolean expensiveThanMaterialization() {
return false;
}
+
+ public List<LogicalVariable> getPrevSecondaryKeys() {
+ return prevSecondaryKeys;
+ }
+
+ public LogicalVariable getPrevFilteringKeys() {
+ return prevAdditionalFilteringKey;
+ }
}
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
similarity index 83%
rename from algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
rename to algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
index c774f44..d844f37 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InsertDeleteUpsertPOperator.java
@@ -34,12 +34,11 @@
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
@@ -47,19 +46,24 @@
import org.apache.hyracks.api.job.JobSpecification;
@SuppressWarnings("rawtypes")
-public class InsertDeletePOperator extends AbstractPhysicalOperator {
+public class InsertDeleteUpsertPOperator extends AbstractPhysicalOperator {
private LogicalVariable payload;
private List<LogicalVariable> keys;
private IDataSource<?> dataSource;
private final List<LogicalVariable> additionalFilteringKeys;
+ private final Kind operation;
+ private final LogicalVariable prevPayload;
- public InsertDeletePOperator(LogicalVariable payload, List<LogicalVariable> keys,
- List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource) {
+ public InsertDeleteUpsertPOperator(LogicalVariable payload, List<LogicalVariable> keys,
+ List<LogicalVariable> additionalFilteringKeys, IDataSource dataSource, Kind operation,
+ LogicalVariable prevPayload) {
this.payload = payload;
this.keys = keys;
this.dataSource = dataSource;
this.additionalFilteringKeys = additionalFilteringKeys;
+ this.operation = operation;
+ this.prevPayload = prevPayload;
}
@Override
@@ -67,10 +71,11 @@
return PhysicalOperatorTag.INSERT_DELETE;
}
+ // Delivered Properties of this will be (Sorted on PK, Partitioned on PK)
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ deliveredProperties = op2.getDeliveredPhysicalProperties().clone();
}
@Override
@@ -90,8 +95,8 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
- InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
+ throws AlgebricksException {
+ InsertDeleteUpsertOperator insertDeleteOp = (InsertDeleteUpsertOperator) op;
IMetadataProvider mp = context.getMetadataProvider();
IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
JobSpecification spec = builder.getJobSpec();
@@ -99,12 +104,17 @@
context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
- if (insertDeleteOp.getOperation() == Kind.INSERT) {
+ if (operation == Kind.INSERT) {
runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
additionalFilteringKeys, inputDesc, context, spec, false);
- } else {
+ } else if (operation == Kind.DELETE) {
runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
additionalFilteringKeys, inputDesc, context, spec);
+ } else if (operation == Kind.UPSERT) {
+ runtimeAndConstraints = mp.getUpsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
+ additionalFilteringKeys, prevPayload, inputDesc, context, spec);
+ } else {
+ throw new AlgebricksException("Unsupported Operation " + operation);
}
builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
index e1983d2..98427fe 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -35,7 +35,7 @@
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 17f2285..cf0f1c2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -38,10 +38,10 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -325,12 +325,16 @@
}
@Override
- public String visitInsertDeleteOperator(InsertDeleteOperator op, Integer indent) throws AlgebricksException {
+ public String visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+ String header = getIndexOpString(op.getOperation());
addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
.append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
+ if (op.getOperation() == Kind.UPSERT) {
+ buffer.append(" out: ([" + op.getPrevRecordVar() + "] <-{record-before-upsert}) ");
+ }
if (op.isBulkload()) {
buffer.append(" [bulkload]");
}
@@ -338,19 +342,38 @@
}
@Override
- public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Integer indent)
+ public String visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Integer indent)
throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
+ String header = getIndexOpString(op.getOperation());
addIndent(buffer, indent).append(header).append(op.getIndexName()).append(" on ")
.append(op.getDataSourceIndex().getDataSource()).append(" from ");
- pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+ if (op.getOperation() == Kind.UPSERT) {
+ buffer.append(" replace:");
+ pprintExprList(op.getPrevSecondaryKeyExprs(), buffer, indent);
+ buffer.append(" with:");
+ pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+ } else {
+ pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+ }
if (op.isBulkload()) {
buffer.append(" [bulkload]");
}
return buffer.toString();
}
+ public String getIndexOpString(Kind opKind) {
+ switch (opKind) {
+ case DELETE:
+ return "delete from ";
+ case INSERT:
+ return "insert into ";
+ case UPSERT:
+ return "upsert into ";
+ }
+ return null;
+ }
+
@Override
public String visitTokenizeOperator(TokenizeOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 285812c..53c8b69 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -29,9 +29,9 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -113,9 +113,9 @@
public R visitWriteResultOperator(WriteResultOperator op, T arg) throws AlgebricksException;
- public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
+ public R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
- public R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T tag) throws AlgebricksException;
+ public R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T tag) throws AlgebricksException;
public R visitExternalDataLookupOperator(ExternalDataLookupOperator op, T arg) throws AlgebricksException;
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
index b5ce212..73628c1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/IQueryOperatorVisitor.java
@@ -19,8 +19,8 @@
package org.apache.hyracks.algebricks.core.algebra.visitors;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
@@ -43,12 +43,12 @@
}
@Override
- public default R visitInsertDeleteOperator(InsertDeleteOperator op, T arg) {
+ public default R visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, T arg) {
throw new UnsupportedOperationException();
}
@Override
- public default R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T arg) {
+ public default R visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, T arg) {
throw new UnsupportedOperationException();
}
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 8f9ab9f..8bf3dbb 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -84,7 +84,7 @@
List<LogicalVariable> scanVariables, List<LogicalVariable> projectVariables, boolean projectPushed,
List<LogicalVariable> minFilterVars, List<LogicalVariable> maxFilterVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, Object implConfig)
- throws AlgebricksException {
+ throws AlgebricksException {
PigletFileDataSource ds = (PigletFileDataSource) dataSource;
FileSplit[] fileSplits = ds.getFileSplits();
@@ -141,15 +141,15 @@
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc)
- throws AlgebricksException {
+ throws AlgebricksException {
PigletFileDataSink ds = (PigletFileDataSink) sink;
FileSplit[] fileSplits = ds.getFileSplits();
String[] locations = new String[fileSplits.length];
for (int i = 0; i < fileSplits.length; ++i) {
locations[i] = fileSplits[i].getNodeName();
}
- IPushRuntimeFactory prf = new SinkWriterRuntimeFactory(printColumns, printerFactories, fileSplits[0]
- .getLocalFile().getFile(), PrinterBasedWriterFactory.INSTANCE, inputDesc);
+ IPushRuntimeFactory prf = new SinkWriterRuntimeFactory(printColumns, printerFactories,
+ fileSplits[0].getLocalFile().getFile(), PrinterBasedWriterFactory.INSTANCE, inputDesc);
AlgebricksAbsolutePartitionConstraint constraint = new AlgebricksAbsolutePartitionConstraint(locations);
return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(prf, constraint);
}
@@ -178,15 +178,11 @@
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
- IDataSourceIndex<String, String> dataSource,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields,
- ILogicalExpression filterExpr,
- RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification spec, boolean bulkload) throws AlgebricksException {
+ IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
+ boolean bulkload) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
@@ -197,16 +193,16 @@
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
- IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
- ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+ IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
@@ -235,4 +231,24 @@
return null;
}
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(IDataSource<String> dataSource,
+ IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
+ LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, LogicalVariable prevPayload,
+ RecordDescriptor recordDesc, JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
+ IDataSourceIndex<String, String> dataSourceIndex, IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
+ ILogicalExpression filterExpr, List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor inputDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 394524b..d85ffe9 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -23,7 +23,6 @@
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
-
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -45,9 +44,10 @@
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
@@ -62,8 +62,8 @@
import org.apache.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeletePOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeletePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -95,7 +95,8 @@
}
@Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+ public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+ throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
// if (context.checkIfInDontApplySet(this, op)) {
// return false;
@@ -116,7 +117,7 @@
}
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, boolean topLevelOp,
IOptimizationContext context) throws AlgebricksException {
PhysicalOptimizationConfig physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
@@ -152,8 +153,8 @@
if (gby.getNestedPlans().size() == 1) {
ILogicalPlan p0 = gby.getNestedPlans().get(0);
if (p0.getRoots().size() == 1) {
- if (gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE
- || gby.getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE) {
+ if (gby.getAnnotations().get(OperatorAnnotations.USE_HASH_GROUP_BY) == Boolean.TRUE || gby
+ .getAnnotations().get(OperatorAnnotations.USE_EXTERNAL_GROUP_BY) == Boolean.TRUE) {
if (!topLevelOp) {
throw new NotImplementedException(
"External hash group-by for nested grouping is not implemented.");
@@ -213,8 +214,8 @@
}
}
if (topLevelOp) {
- op.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig
- .getMaxFramesExternalSort()));
+ op.setPhysicalOperator(
+ new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
} else {
op.setPhysicalOperator(new InMemoryStableSortPOperator());
}
@@ -282,12 +283,13 @@
additionalFilteringKeys = new ArrayList<LogicalVariable>();
getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
- op.setPhysicalOperator(new WriteResultPOperator(opLoad.getDataSource(), payload, keys,
- additionalFilteringKeys));
+ op.setPhysicalOperator(
+ new WriteResultPOperator(opLoad.getDataSource(), payload, keys, additionalFilteringKeys));
break;
}
- case INSERT_DELETE: {
- InsertDeleteOperator opLoad = (InsertDeleteOperator) op;
+ case INSERT_DELETE_UPSERT: {
+ // Primary index
+ InsertDeleteUpsertOperator opLoad = (InsertDeleteUpsertOperator) op;
LogicalVariable payload;
List<LogicalVariable> keys = new ArrayList<LogicalVariable>();
List<LogicalVariable> additionalFilteringKeys = null;
@@ -297,16 +299,17 @@
getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
if (opLoad.isBulkload()) {
- op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad
- .getDataSource()));
+ op.setPhysicalOperator(
+ new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad.getDataSource()));
} else {
- op.setPhysicalOperator(new InsertDeletePOperator(payload, keys, additionalFilteringKeys, opLoad
- .getDataSource()));
+ op.setPhysicalOperator(new InsertDeleteUpsertPOperator(payload, keys, additionalFilteringKeys,
+ opLoad.getDataSource(), opLoad.getOperation(), opLoad.getPrevRecordVar()));
}
break;
}
- case INDEX_INSERT_DELETE: {
- IndexInsertDeleteOperator opInsDel = (IndexInsertDeleteOperator) op;
+ case INDEX_INSERT_DELETE_UPSERT: {
+ // Secondary index
+ IndexInsertDeleteUpsertOperator opInsDel = (IndexInsertDeleteUpsertOperator) op;
List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
List<LogicalVariable> additionalFilteringKeys = null;
@@ -317,13 +320,24 @@
getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
if (opInsDel.isBulkload()) {
- op.setPhysicalOperator(new IndexBulkloadPOperator(primaryKeys, secondaryKeys,
- additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+ op.setPhysicalOperator(
+ new IndexBulkloadPOperator(primaryKeys, secondaryKeys, additionalFilteringKeys,
+ opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
} else {
- op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
- additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+ List<LogicalVariable> prevSecondaryKeys = null;
+ LogicalVariable prevAdditionalFilteringKey = null;
+ if (opInsDel.getOperation() == Kind.UPSERT) {
+ prevSecondaryKeys = new ArrayList<LogicalVariable>();
+ getKeys(opInsDel.getPrevSecondaryKeyExprs(), prevSecondaryKeys);
+ if (opInsDel.getPrevAdditionalFilteringExpression() != null) {
+ prevAdditionalFilteringKey = ((VariableReferenceExpression) (opInsDel
+ .getPrevAdditionalFilteringExpression()).getValue()).getVariableReference();
+ }
+ }
+ op.setPhysicalOperator(new IndexInsertDeleteUpsertPOperator(primaryKeys, secondaryKeys,
+ additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex(),
+ prevSecondaryKeys, prevAdditionalFilteringKey));
}
-
break;
}
@@ -335,8 +349,8 @@
getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
// Tokenize Operator only operates with a bulk load on a data set with an index
if (opTokenize.isBulkload()) {
- op.setPhysicalOperator(new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize
- .getDataSourceIndex()));
+ op.setPhysicalOperator(
+ new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize.getDataSourceIndex()));
}
break;
}
@@ -415,8 +429,8 @@
int n = aggOp.getExpressions().size();
List<Mutable<ILogicalExpression>> mergeExpressionRefs = new ArrayList<Mutable<ILogicalExpression>>();
for (int i = 0; i < n; i++) {
- ILogicalExpression mergeExpr = mergeAggregationExpressionFactory.createMergeAggregation(
- originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
+ ILogicalExpression mergeExpr = mergeAggregationExpressionFactory
+ .createMergeAggregation(originalAggVars.get(i), aggFuncRefs.get(i).getValue(), context);
if (mergeExpr == null) {
return false;
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index a89e895..70d4f80 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -20,6 +20,7 @@
import java.io.DataInputStream;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.hyracks.api.comm.FrameConstants;
import org.apache.hyracks.api.comm.FrameHelper;
@@ -72,9 +73,8 @@
@Override
public int getTupleStartOffset(int tupleIndex) {
- int offset = tupleIndex == 0 ?
- FrameConstants.TUPLE_START_OFFSET :
- IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
+ int offset = tupleIndex == 0 ? FrameConstants.TUPLE_START_OFFSET
+ : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex);
return start + offset;
}
@@ -85,16 +85,15 @@
@Override
public int getTupleEndOffset(int tupleIndex) {
- return start + IntSerDeUtils
- .getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * (tupleIndex + 1));
+ return start
+ + IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * (tupleIndex + 1));
}
@Override
public int getFieldStartOffset(int tupleIndex, int fIdx) {
- return fIdx == 0 ?
- 0 :
- IntSerDeUtils
- .getInt(buffer.array(), getTupleStartOffset(tupleIndex) + (fIdx - 1) * FrameConstants.SIZE_LEN);
+ return fIdx == 0 ? 0
+ : IntSerDeUtils.getInt(buffer.array(),
+ getTupleStartOffset(tupleIndex) + (fIdx - 1) * FrameConstants.SIZE_LEN);
}
@Override
@@ -161,4 +160,44 @@
public int getFieldCount() {
return recordDescriptor.getFieldCount();
}
+
+ /*
+ * The two methods below can be used for debugging.
+ * They are safe as they don't print records. Printing records
+ * using IserializerDeserializer can print incorrect results or throw exceptions.
+ * A better way yet would be to use record pointable.
+ */
+ public void prettyPrint(String prefix, int[] recordFields) {
+ ByteBufferInputStream bbis = new ByteBufferInputStream();
+ DataInputStream dis = new DataInputStream(bbis);
+ int tc = getTupleCount();
+ StringBuilder sb = new StringBuilder();
+ sb.append(prefix).append("TC: " + tc).append("\n");
+ for (int i = 0; i < tc; ++i) {
+ prettyPrint(i, bbis, dis, sb, recordFields);
+ }
+ System.err.println(sb.toString());
+ }
+
+ protected void prettyPrint(int tid, ByteBufferInputStream bbis, DataInputStream dis, StringBuilder sb,
+ int[] recordFields) {
+ Arrays.sort(recordFields);
+ sb.append(" tid" + tid + ":(" + getTupleStartOffset(tid) + ", " + getTupleEndOffset(tid) + ")[");
+ for (int j = 0; j < getFieldCount(); ++j) {
+ sb.append("f" + j + ":(" + getFieldStartOffset(tid, j) + ", " + getFieldEndOffset(tid, j) + ") ");
+ sb.append("{");
+ bbis.setByteBuffer(buffer, getTupleStartOffset(tid) + getFieldSlotsLength() + getFieldStartOffset(tid, j));
+ try {
+ if (Arrays.binarySearch(recordFields, j) >= 0) {
+ sb.append("a record field: only print using pointable");
+ } else {
+ sb.append(recordDescriptor.getFields()[j].deserialize(dis));
+ }
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ }
+ sb.append("}");
+ }
+ sb.append("\n");
+ }
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
index abf9c37..15ebe52 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/accessors/FrameTupleReference.java
@@ -21,9 +21,9 @@
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-public final class FrameTupleReference implements IFrameTupleReference {
- private IFrameTupleAccessor fta;
- private int tIndex;
+public class FrameTupleReference implements IFrameTupleReference {
+ protected IFrameTupleAccessor fta;
+ protected int tIndex;
public void reset(IFrameTupleAccessor fta, int tIndex) {
this.fta = fta;
diff --git a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
index 0399ede..f1a411b 100644
--- a/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
+++ b/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/RangePredicate.java
@@ -70,10 +70,12 @@
this.highKeyCmp = highKeyCmp;
}
+ @Override
public MultiComparator getLowKeyComparator() {
return lowKeyCmp;
}
+ @Override
public MultiComparator getHighKeyComparator() {
return highKeyCmp;
}
@@ -86,6 +88,7 @@
this.highKeyCmp = highKeyCmp;
}
+ @Override
public ITupleReference getLowKey() {
return lowKey;
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
index 5aac5a6..7ec4a30 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
@@ -22,33 +22,42 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
/**
- * This operation callback allows for arbitrary actions to be taken while traversing
- * an index structure. The {@link IModificationOperationCallback} will be called on
+ * This operation callback allows for arbitrary actions to be taken while traversing
+ * an index structure. The {@link IModificationOperationCallback} will be called on
* all modifying operations (e.g. insert, update, delete...) for all indexes.
- *
* @author zheilbron
*/
public interface IModificationOperationCallback {
+ public enum Operation {
+ INSERT,
+ DELETE
+ }
/**
- * This method is called on a tuple that is about to traverse an index's structure
- * (i.e. before any pages are pinned or latched).
- *
+ * This method is called on a tuple that is about to traverse an index's structure
+ * (i.e. before any pages are pinned or latched).
* The format of the tuple is the format that would be stored in the index itself.
- *
- * @param tuple the tuple that is about to be operated on
+ * @param tuple
+ * the tuple that is about to be operated on
*/
public void before(ITupleReference tuple) throws HyracksDataException;
/**
- * This method is called on a tuple when a tuple with a matching key is found for the
- * current operation. It is possible that tuple is null, in which case no tuple with a
+ * This method is called on a tuple when a tuple with a matching key is found for the
+ * current operation. It is possible that tuple is null, in which case no tuple with a
* matching key was found.
- *
- * When found is called, the leaf page where the tuple resides will be pinned and latched,
+ * When found is called, the leaf page where the tuple resides will be pinned and latched,
* so blocking operations should be avoided.
- *
- * @param tuple a tuple with a matching key, otherwise null if none exists
+ * @param tuple
+ * a tuple with a matching key, otherwise null if none exists
*/
public void found(ITupleReference before, ITupleReference after) throws HyracksDataException;
+
+ /**
+ * This call specifies the next operation to be performed. It is used to allow
+ * a single operator to perform different operations per tuple
+ * @param op
+ * @throws HyracksDataException
+ */
+ public void setOp(Operation op) throws HyracksDataException;
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
index 08ad1d0..9b1cd47 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchOperationCallback.java
@@ -22,50 +22,56 @@
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
/**
- * This operation callback allows for arbitrary actions to be taken while traversing
- * an index structure. The {@link ISearchOperationCallback} will be called on
+ * This operation callback allows for arbitrary actions to be taken while traversing
+ * an index structure. The {@link ISearchOperationCallback} will be called on
* all search operations for ordered indexes only.
- *
* @author zheilbron
*/
public interface ISearchOperationCallback {
/**
- * During an index search operation, this method will be called on tuples as they are
- * passed by with a search cursor. This call will be invoked while a leaf page is latched
- * and pinned. If the call returns false, then the page will be unlatched and unpinned and
- * {@link #reconcile(ITupleReference)} will be called with the tuple that was not proceeded
+ * After the harness enters the operation components and before an index search operation starts,
+ * this method will be called on the search key.
+ * @param tuple
+ * the tuple containing the search key (expected to be a point search key)
+ */
+ public void before(ITupleReference tuple) throws HyracksDataException;
+
+ /**
+ * During an index search operation, this method will be called on tuples as they are
+ * passed by with a search cursor. This call will be invoked while a leaf page is latched
+ * and pinned. If the call returns false, then the page will be unlatched and unpinned and
+ * {@link #reconcile(ITupleReference)} will be called with the tuple that was not proceeded
* on.
- *
- * @param tuple the tuple that is being passed over by the search cursor
+ * @param tuple
+ * the tuple that is being passed over by the search cursor
* @return true to proceed otherwise false to unlatch and unpin, leading to reconciliation
*/
public boolean proceed(ITupleReference tuple) throws HyracksDataException;
/**
- * This method is only called on a tuple that was not 'proceeded' on
- * (see {@link #proceed(ITupleReference)}). This method allows an opportunity to reconcile
- * by performing any necessary actions before resuming the search (e.g. a try-lock may have
+ * This method is only called on a tuple that was not 'proceeded' on
+ * (see {@link #proceed(ITupleReference)}). This method allows an opportunity to reconcile
+ * by performing any necessary actions before resuming the search (e.g. a try-lock may have
* failed in the proceed call, and now in reconcile we should take a full (blocking) lock).
- *
- * @param tuple the tuple that failed to proceed
+ * @param tuple
+ * the tuple that failed to proceed
*/
public void reconcile(ITupleReference tuple) throws HyracksDataException;
/**
- * This method is only called on a tuple that was reconciled on, but not found after
- * retraversing. This method allows an opportunity to cancel some action that was taken in
+ * This method is only called on a tuple that was reconciled on, but not found after
+ * retraversing. This method allows an opportunity to cancel some action that was taken in
* {@link #reconcile(ITupleReference))}.
- *
- * @param tuple the tuple that was previously reconciled
+ * @param tuple
+ * the tuple that was previously reconciled
*/
public void cancel(ITupleReference tuple) throws HyracksDataException;
-
+
/**
* This method is only called on a tuple that was reconciled on, and found after
- * retraversing. This method allows an opportunity to do some subsequent action that was
+ * retraversing. This method allows an opportunity to do some subsequent action that was
* taken in {@link #reconcile(ITupleReference))}.
- *
* @param tuple
* the tuple that was previously reconciled
*/
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
index 94a6a27..1c0d143 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ISearchPredicate.java
@@ -21,10 +21,20 @@
import java.io.Serializable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
public interface ISearchPredicate extends Serializable {
- public MultiComparator getLowKeyComparator();
+ public MultiComparator getLowKeyComparator();
- public MultiComparator getHighKeyComparator();
+ public MultiComparator getHighKeyComparator();
+
+ /**
+ * Get the search key to be used with point search operation on primary index.
+ * This method will only be called with point search predicates that only happen in primary index.
+ * @return
+ * @throws HyracksDataException
+ */
+ public ITupleReference getLowKey();
}
\ No newline at end of file
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index b632ba9..e8ab8dc 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -27,7 +27,7 @@
/**
* Dummy operation callback that simply does nothing.
*/
-public enum NoOpOperationCallback implements IModificationOperationCallback, ISearchOperationCallback {
+public enum NoOpOperationCallback implements IModificationOperationCallback,ISearchOperationCallback {
INSTANCE;
@Override
@@ -42,12 +42,12 @@
@Override
public void before(ITupleReference tuple) {
- // Do nothing.
+ // Do nothing.
}
@Override
public void found(ITupleReference before, ITupleReference after) {
- // Do nothing.
+ // Do nothing.
}
@Override
@@ -59,4 +59,9 @@
public void complete(ITupleReference tuple) throws HyracksDataException {
// Do nothing.
}
+
+ @Override
+ public void setOp(Operation op) throws HyracksDataException {
+ // Do nothing.
+ }
}
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
index 5f27835..b9792c2 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/tuples/PermutingFrameTupleReference.java
@@ -19,51 +19,40 @@
package org.apache.hyracks.storage.am.common.tuples;
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-public class PermutingFrameTupleReference implements IFrameTupleReference {
- private IFrameTupleAccessor fta;
- private int tIndex;
- private int[] fieldPermutation;
+public class PermutingFrameTupleReference extends FrameTupleReference {
+ private int[] fieldPermutation;
- public void setFieldPermutation(int[] fieldPermutation) {
- this.fieldPermutation = fieldPermutation;
- }
+ public PermutingFrameTupleReference(int[] fieldPermutation) {
+ this.fieldPermutation = fieldPermutation;
+ }
- public void reset(IFrameTupleAccessor fta, int tIndex) {
- this.fta = fta;
- this.tIndex = tIndex;
- }
+ public PermutingFrameTupleReference() {
+ }
- @Override
- public IFrameTupleAccessor getFrameTupleAccessor() {
- return fta;
- }
+ public void setFieldPermutation(int[] fieldPermutation) {
+ this.fieldPermutation = fieldPermutation;
+ }
- @Override
- public int getTupleIndex() {
- return tIndex;
- }
+ @Override
+ public int getFieldCount() {
+ return fieldPermutation.length;
+ }
- @Override
- public int getFieldCount() {
- return fieldPermutation.length;
- }
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ return fta.getBuffer().array();
+ }
- @Override
- public byte[] getFieldData(int fIdx) {
- return fta.getBuffer().array();
- }
+ @Override
+ public int getFieldStart(int fIdx) {
+ return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength()
+ + fta.getFieldStartOffset(tIndex, fieldPermutation[fIdx]);
+ }
- @Override
- public int getFieldStart(int fIdx) {
- return fta.getTupleStartOffset(tIndex) + fta.getFieldSlotsLength()
- + fta.getFieldStartOffset(tIndex, fieldPermutation[fIdx]);
- }
-
- @Override
- public int getFieldLength(int fIdx) {
- return fta.getFieldLength(tIndex, fieldPermutation[fIdx]);
- }
+ @Override
+ public int getFieldLength(int fIdx) {
+ return fta.getFieldLength(tIndex, fieldPermutation[fIdx]);
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index ae3aa52..13c6949 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -271,13 +271,13 @@
operationalComponents.clear();
switch (ctx.getOperation()) {
case UPDATE:
- case UPSERT:
case PHYSICALDELETE:
case FLUSH:
case DELETE:
operationalComponents.add(memoryComponents.get(cmc));
break;
case INSERT:
+ case UPSERT:
addOperationalMutableComponents(operationalComponents);
operationalComponents.addAll(immutableComponents);
break;
diff --git a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
index 08891c2..fa25524 100644
--- a/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java
@@ -54,8 +54,8 @@
public IndexOperation op;
public final MultiComparator cmp;
public final MultiComparator bloomFilterCmp;
- public final IModificationOperationCallback modificationCallback;
- public final ISearchOperationCallback searchCallback;
+ public IModificationOperationCallback modificationCallback;
+ public ISearchOperationCallback searchCallback;
private final List<ILSMComponent> componentHolder;
private final List<ILSMComponent> componentsToBeMerged;
private final List<ILSMComponent> componentsToBeReplicated;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 63e651a..4c4ed28 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -28,8 +28,8 @@
public interface ILSMHarness {
- public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
- IndexException;
+ public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
+ throws HyracksDataException, IndexException;
public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
throws HyracksDataException, IndexException;
@@ -45,14 +45,14 @@
public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException;
- public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
- IndexException;
+ public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException;
public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException;
- public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
- IndexException;
+ public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException;
public void addBulkLoadedComponent(ILSMComponent index) throws HyracksDataException, IndexException;
@@ -62,5 +62,4 @@
LSMOperationType opType) throws HyracksDataException;
public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException;
-
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
index 08bcfee..99f981d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java
@@ -39,6 +39,6 @@
public void setSearchPredicate(ISearchPredicate searchPredicate);
public ISearchPredicate getSearchPredicate();
-
+
public List<ILSMComponent> getComponentsToBeReplicated();
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
index b172864..d99c9e8 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.java
@@ -22,17 +22,19 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INullWriterFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.AbstractTreeIndexOperatorDescriptor;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.common.IStorageManagerInterface;
import org.apache.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
@@ -49,18 +51,20 @@
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
IndexOperation op, IIndexDataflowHelperFactory dataflowHelperFactory,
- ITupleFilterFactory tupleFilterFactory, IModificationOperationCallbackFactory modificationOpCallbackProvider) {
+ ITupleFilterFactory tupleFilterFactory, INullWriterFactory nullWriterFactory,
+ IModificationOperationCallbackFactory modificationOpCallbackProvider,
+ ISearchOperationCallbackFactory searchOpCallbackProvider) {
super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, tupleFilterFactory, false,
- false, null,
- NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackProvider);
+ comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, tupleFilterFactory, false, false,
+ nullWriterFactory, NoOpLocalResourceFactoryProvider.INSTANCE, searchOpCallbackProvider,
+ modificationOpCallbackProvider);
this.fieldPermutation = fieldPermutation;
this.op = op;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new LSMIndexInsertUpdateDeleteOperatorNodePushable(this, ctx, partition, fieldPermutation,
recordDescProvider, op);
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 54500a6..21b0d8a 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -81,7 +81,8 @@
if (!((AbstractMemoryLSMComponent) flushingComponent).isModified()) {
//The mutable component has not been modified by any writer. There is nothing to flush.
//since the component is empty, set its state back to READABLE_WRITABLE
- if (((AbstractLSMIndex) lsmIndex).getCurrentMutableComponentState() == ComponentState.READABLE_UNWRITABLE) {
+ if (((AbstractLSMIndex) lsmIndex)
+ .getCurrentMutableComponentState() == ComponentState.READABLE_UNWRITABLE) {
((AbstractLSMIndex) lsmIndex)
.setCurrentMutableComponentState(ComponentState.READABLE_WRITABLE);
}
@@ -196,8 +197,8 @@
if (c.getType() == LSMComponentType.MEMORY) {
switch (c.getState()) {
case READABLE_UNWRITABLE:
- if (isMutableComponent
- && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
+ if (isMutableComponent && (opType == LSMOperationType.MODIFICATION
+ || opType == LSMOperationType.FORCE_MODIFICATION)) {
lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
}
break;
@@ -256,8 +257,8 @@
e.printStackTrace();
throw e;
} finally {
- if (failedOperation
- && (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION)) {
+ if (failedOperation && (opType == LSMOperationType.MODIFICATION
+ || opType == LSMOperationType.FORCE_MODIFICATION)) {
//When the operation failed, completeOperation() method must be called
//in order to decrement active operation count which was incremented in beforeOperation() method.
opTracker.completeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
@@ -314,8 +315,8 @@
}
@Override
- public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
- IndexException {
+ public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
+ throws HyracksDataException, IndexException {
LSMOperationType opType = LSMOperationType.FORCE_MODIFICATION;
modify(ctx, false, tuple, opType);
}
@@ -357,6 +358,7 @@
ctx.setSearchPredicate(pred);
getAndEnterComponents(ctx, opType, false);
try {
+ ctx.getSearchOperationCallback().before(pred.getLowKey());
lsmIndex.search(ctx, cursor, pred);
} catch (HyracksDataException | IndexException e) {
exitComponents(ctx, opType, null, true);
@@ -386,8 +388,8 @@
}
@Override
- public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
- IndexException {
+ public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Started a flush operation for index: " + lsmIndex + " ...");
}
@@ -434,8 +436,8 @@
}
@Override
- public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
- IndexException {
+ public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Started a merge operation for index: " + lsmIndex + " ...");
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
index 284fc1b..009bd36 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java
@@ -109,13 +109,13 @@
OnDiskInvertedIndexFactory diskInvIndexFactory, BTreeFactory deletedKeysBTreeFactory,
BloomFilterFactory bloomFilterFactory, ILSMComponentFilterFactory filterFactory,
ILSMComponentFilterFrameFactory filterFrameFactory, LSMComponentFilterManager filterManager,
- double bloomFilterFalsePositiveRate, ILSMIndexFileManager fileManager,
- IFileMapProvider diskFileMapProvider, ITypeTraits[] invListTypeTraits,
- IBinaryComparatorFactory[] invListCmpFactories, ITypeTraits[] tokenTypeTraits,
- IBinaryComparatorFactory[] tokenCmpFactories, IBinaryTokenizerFactory tokenizerFactory,
- ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker, ILSMIOOperationScheduler ioScheduler,
- ILSMIOOperationCallback ioOpCallback, int[] invertedIndexFields, int[] filterFields,
- int[] filterFieldsForNonBulkLoadOps, int[] invertedIndexFieldsForNonBulkLoadOps, boolean durable) throws IndexException {
+ double bloomFilterFalsePositiveRate, ILSMIndexFileManager fileManager, IFileMapProvider diskFileMapProvider,
+ ITypeTraits[] invListTypeTraits, IBinaryComparatorFactory[] invListCmpFactories,
+ ITypeTraits[] tokenTypeTraits, IBinaryComparatorFactory[] tokenCmpFactories,
+ IBinaryTokenizerFactory tokenizerFactory, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ ILSMIOOperationScheduler ioScheduler, ILSMIOOperationCallback ioOpCallback, int[] invertedIndexFields,
+ int[] filterFields, int[] filterFieldsForNonBulkLoadOps, int[] invertedIndexFieldsForNonBulkLoadOps,
+ boolean durable) throws IndexException {
super(virtualBufferCaches, diskInvIndexFactory.getBufferCache(), fileManager, diskFileMapProvider,
bloomFilterFalsePositiveRate, mergePolicy, opTracker, ioScheduler, ioOpCallback, filterFrameFactory,
filterManager, filterFields, durable);
@@ -136,13 +136,14 @@
for (IVirtualBufferCache virtualBufferCache : virtualBufferCaches) {
InMemoryInvertedIndex memInvIndex = createInMemoryInvertedIndex(virtualBufferCache,
new VirtualMetaDataPageManager(virtualBufferCache.getNumPages()), i);
- BTree deleteKeysBTree = BTreeUtils.createBTree(virtualBufferCache, new VirtualMetaDataPageManager(
- virtualBufferCache.getNumPages()), virtualBufferCache.getFileMapProvider(),
- invListTypeTraits, invListCmpFactories, BTreeLeafFrameType.REGULAR_NSM, new FileReference(new File(
- fileManager.getBaseDir() + "_virtual_del_" + i)));
+ BTree deleteKeysBTree = BTreeUtils.createBTree(virtualBufferCache,
+ new VirtualMetaDataPageManager(virtualBufferCache.getNumPages()),
+ virtualBufferCache.getFileMapProvider(), invListTypeTraits, invListCmpFactories,
+ BTreeLeafFrameType.REGULAR_NSM,
+ new FileReference(new File(fileManager.getBaseDir() + "_virtual_del_" + i)));
LSMInvertedIndexMemoryComponent mutableComponent = new LSMInvertedIndexMemoryComponent(memInvIndex,
- deleteKeysBTree, virtualBufferCache, i == 0 ? true : false, filterFactory == null ? null
- : filterFactory.createLSMComponentFilter());
+ deleteKeysBTree, virtualBufferCache, i == 0 ? true : false,
+ filterFactory == null ? null : filterFactory.createLSMComponentFilter());
memoryComponents.add(mutableComponent);
++i;
}
@@ -271,6 +272,7 @@
case FLUSH:
case DELETE:
case INSERT:
+ case UPSERT:
operationalComponents.add(memoryComponents.get(cmc));
break;
case SEARCH:
@@ -336,8 +338,8 @@
indexTuple = tuple;
}
- ctx.modificationCallback.before(indexTuple);
- ctx.modificationCallback.found(null, indexTuple);
+ ctx.getModificationCallback().before(indexTuple);
+ ctx.getModificationCallback().found(null, indexTuple);
switch (ctx.getOperation()) {
case INSERT: {
// Insert into the in-memory inverted index.
@@ -362,8 +364,8 @@
}
if (ctx.filterTuple != null) {
ctx.filterTuple.reset(tuple);
- memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter()
- .update(ctx.filterTuple, ctx.filterCmp);
+ memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter().update(ctx.filterTuple,
+ ctx.filterCmp);
}
}
@@ -383,9 +385,8 @@
IIndexAccessor invIndexAccessor = ((LSMInvertedIndexMemoryComponent) component).getInvIndex()
.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
indexAccessors.add(invIndexAccessor);
- IIndexAccessor deletedKeysAccessor = ((LSMInvertedIndexMemoryComponent) component)
- .getDeletedKeysBTree().createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
+ IIndexAccessor deletedKeysAccessor = ((LSMInvertedIndexMemoryComponent) component).getDeletedKeysBTree()
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
deletedKeysBTreeAccessors.add(deletedKeysAccessor);
} else {
IIndexAccessor invIndexAccessor = ((LSMInvertedIndexDiskComponent) component).getInvIndex()
@@ -415,8 +416,8 @@
initState = new LSMInvertedIndexSearchCursorInitialState(keyCmp, keysOnlyTuple, indexAccessors,
deletedKeysBTreeAccessors,
((LSMInvertedIndexMemoryComponent) memoryComponents.get(currentMutableComponentId.get()))
- .getDeletedKeysBTree().getLeafFrameFactory(), ictx, includeMutableComponent, lsmHarness,
- operationalComponents);
+ .getDeletedKeysBTree().getLeafFrameFactory(),
+ ictx, includeMutableComponent, lsmHarness, operationalComponents);
} else {
LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) memoryComponents
.get(currentMutableComponentId.get());
@@ -424,8 +425,9 @@
MultiComparator tokensAndKeysCmp = MultiComparator.create(memInvIndex.getBTree().getComparatorFactories());
initState = new LSMInvertedIndexRangeSearchCursorInitialState(tokensAndKeysCmp, keyCmp, keysOnlyTuple,
((LSMInvertedIndexMemoryComponent) memoryComponents.get(currentMutableComponentId.get()))
- .getDeletedKeysBTree().getLeafFrameFactory(), includeMutableComponent, lsmHarness,
- indexAccessors, deletedKeysBTreeAccessors, pred, operationalComponents);
+ .getDeletedKeysBTree().getLeafFrameFactory(),
+ includeMutableComponent, lsmHarness, indexAccessors, deletedKeysBTreeAccessors, pred,
+ operationalComponents);
}
return initState;
}
@@ -453,8 +455,8 @@
opCtx.setOperation(IndexOperation.FLUSH);
opCtx.getComponentHolder().add(flushingComponent);
ioScheduler.scheduleOperation(new LSMInvertedIndexFlushOperation(
- new LSMInvertedIndexAccessor(lsmHarness, opCtx), flushingComponent, componentFileRefs
- .getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
+ new LSMInvertedIndexAccessor(lsmHarness, opCtx), flushingComponent,
+ componentFileRefs.getInsertIndexFileReference(), componentFileRefs.getDeleteIndexFileReference(),
componentFileRefs.getBloomFilterFileReference(), callback, fileManager.getBaseDir()));
}
@@ -494,13 +496,12 @@
filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple());
filterManager.updateFilterInfo(component.getLSMComponentFilter(), filterTuples);
filterManager.writeFilterInfo(component.getLSMComponentFilter(),
- ((OnDiskInvertedIndex) component.getInvIndex()).getBTree()
- );
+ ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
}
invIndexBulkLoader.end();
- IIndexAccessor deletedKeysBTreeAccessor = flushingComponent.getDeletedKeysBTree().createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ IIndexAccessor deletedKeysBTreeAccessor = flushingComponent.getDeletedKeysBTree()
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
IIndexCursor btreeCountingCursor = ((BTreeAccessor) deletedKeysBTreeAccessor).createCountingSearchCursor();
deletedKeysBTreeAccessor.search(btreeCountingCursor, nullPred);
long numBTreeTuples = 0L;
@@ -549,7 +550,8 @@
@Override
public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
throws HyracksDataException, IndexException {
- LSMInvertedIndexOpContext ictx = createOpContext(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ LSMInvertedIndexOpContext ictx = createOpContext(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
ictx.setOperation(IndexOperation.MERGE);
List<ILSMComponent> mergingComponents = ctx.getComponentHolder();
IIndexCursor cursor = new LSMInvertedIndexRangeSearchCursor(ictx);
@@ -648,8 +650,7 @@
}
filterManager.updateFilterInfo(component.getLSMComponentFilter(), filterTuples);
filterManager.writeFilterInfo(component.getLSMComponentFilter(),
- ((OnDiskInvertedIndex) component.getInvIndex()).getBTree()
- );
+ ((OnDiskInvertedIndex) component.getInvIndex()).getBTree());
}
invIndexBulkLoader.end();
@@ -676,7 +677,8 @@
public IIndexBulkLoader createBulkLoader(float fillFactor, boolean verifyInput, long numElementsHint,
boolean checkIfEmptyIndex, boolean appendOnly) throws IndexException {
try {
- return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex, appendOnly);
+ return new LSMInvertedIndexBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ appendOnly);
} catch (HyracksDataException e) {
throw new IndexException(e);
}
@@ -706,10 +708,10 @@
}
invIndexBulkLoader = ((LSMInvertedIndexDiskComponent) component).getInvIndex().createBulkLoader(fillFactor,
verifyInput, numElementsHint, false, true);
-
+
//validity of the component depends on the deleted keys file being there even if it's empty.
- deletedKeysBTreeBulkLoader = ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree().createBulkLoader(fillFactor,
- verifyInput, numElementsHint, false, true);
+ deletedKeysBTreeBulkLoader = ((LSMInvertedIndexDiskComponent) component).getDeletedKeysBTree()
+ .createBulkLoader(fillFactor, verifyInput, numElementsHint, false, true);
if (filterFields != null) {
indexTuple = new PermutingTupleReference(invertedIndexFields);
@@ -782,11 +784,11 @@
@Override
public void abort() throws HyracksDataException {
- if( invIndexBulkLoader != null){
+ if (invIndexBulkLoader != null) {
invIndexBulkLoader.abort();
}
- if(deletedKeysBTreeBulkLoader != null){
+ if (deletedKeysBTreeBulkLoader != null) {
deletedKeysBTreeBulkLoader.abort();
}
}
@@ -800,11 +802,10 @@
}
protected LSMInvertedIndexDiskComponent createDiskInvIndexComponent(ILSMComponentFactory factory,
- FileReference dictBTreeFileRef, FileReference btreeFileRef, FileReference bloomFilterFileRef, boolean create)
- throws HyracksDataException, IndexException {
- LSMInvertedIndexDiskComponent component = (LSMInvertedIndexDiskComponent) factory
- .createLSMComponentInstance(new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef,
- bloomFilterFileRef));
+ FileReference dictBTreeFileRef, FileReference btreeFileRef, FileReference bloomFilterFileRef,
+ boolean create) throws HyracksDataException, IndexException {
+ LSMInvertedIndexDiskComponent component = (LSMInvertedIndexDiskComponent) factory.createLSMComponentInstance(
+ new LSMComponentFileReferences(dictBTreeFileRef, btreeFileRef, bloomFilterFileRef));
if (create) {
component.getBloomFilter().create();
component.getBloomFilter().activate();
@@ -884,10 +885,10 @@
LSMInvertedIndexDiskComponent invIndexComponent = (LSMInvertedIndexDiskComponent) lsmComponent;
OnDiskInvertedIndex invIndex = (OnDiskInvertedIndex) invIndexComponent.getInvIndex();
IBufferCache bufferCache = invIndex.getBufferCache();
- markAsValidInternal(invIndex.getBufferCache(),invIndexComponent.getBloomFilter());
+ markAsValidInternal(invIndex.getBufferCache(), invIndexComponent.getBloomFilter());
// Flush inverted index second.
- bufferCache.force(invIndex.getInvListsFileId(),true);
+ bufferCache.force(invIndex.getInvListsFileId(), true);
markAsValidInternal(invIndex.getBTree());
// Flush deleted keys BTree.
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index 49b1b03..c511a67 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@ -44,8 +44,8 @@
private final List<ILSMComponent> componentsToBeMerged;
private final List<ILSMComponent> componentsToBeReplicated;
- public final IModificationOperationCallback modificationCallback;
- public final ISearchOperationCallback searchCallback;
+ private IModificationOperationCallback modificationCallback;
+ private ISearchOperationCallback searchCallback;
// Tuple that only has the inverted-index elements (aka keys), projecting away the document fields.
public PermutingTupleReference keysOnlyTuple;
@@ -79,10 +79,10 @@
for (int i = 0; i < mutableComponents.size(); i++) {
LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) mutableComponents
.get(i);
- mutableInvIndexAccessors[i] = (IInvertedIndexAccessor) mutableComponent.getInvIndex().createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
- deletedKeysBTreeAccessors[i] = mutableComponent.getDeletedKeysBTree().createAccessor(
- NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ mutableInvIndexAccessors[i] = (IInvertedIndexAccessor) mutableComponent.getInvIndex()
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ deletedKeysBTreeAccessors[i] = mutableComponent.getDeletedKeysBTree()
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
}
assert mutableComponents.size() > 0;
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
index 91058af..274e399 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexSearchCursorInitialState.java
@@ -48,11 +48,10 @@
private final List<ILSMComponent> operationalComponents;
- public LSMInvertedIndexSearchCursorInitialState(final MultiComparator keyCmp,
- PermutingTupleReference keysOnlyTuple, List<IIndexAccessor> indexAccessors,
- List<IIndexAccessor> deletedKeysBTreeAccessors, ITreeIndexFrameFactory deletedKeysBtreeLeafFrameFactory,
- IIndexOperationContext ctx, boolean includeMemComponent, ILSMHarness lsmHarness,
- List<ILSMComponent> operationalComponents) {
+ public LSMInvertedIndexSearchCursorInitialState(final MultiComparator keyCmp, PermutingTupleReference keysOnlyTuple,
+ List<IIndexAccessor> indexAccessors, List<IIndexAccessor> deletedKeysBTreeAccessors,
+ ITreeIndexFrameFactory deletedKeysBtreeLeafFrameFactory, IIndexOperationContext ctx,
+ boolean includeMemComponent, ILSMHarness lsmHarness, List<ILSMComponent> operationalComponents) {
this.keyCmp = keyCmp;
this.keysOnlyTuple = keysOnlyTuple;
this.indexAccessors = indexAccessors;
@@ -62,7 +61,7 @@
this.operationalComponents = operationalComponents;
this.lsmHarness = lsmHarness;
this.ctx = (LSMInvertedIndexOpContext) ctx;
- this.searchCallback = this.ctx.searchCallback;
+ this.searchCallback = this.ctx.getSearchOperationCallback();
}
@Override
@@ -121,7 +120,7 @@
public List<IIndexAccessor> getDeletedKeysBTreeAccessors() {
return deletedKeysBTreeAccessors;
}
-
+
public ITreeIndexFrameFactory getgetDeletedKeysBTreeLeafFrameFactory() {
return deletedKeysBtreeLeafFrameFactory;
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchPredicate.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchPredicate.java
index f379759..e37f007 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchPredicate.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/search/InvertedIndexSearchPredicate.java
@@ -80,4 +80,9 @@
// TODO: This doesn't make sense for an inverted index. Change ISearchPredicate interface.
return null;
}
+
+ @Override
+ public ITupleReference getLowKey() {
+ return null;
+ }
}
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
index 61284f0..bb0c8d6 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java
@@ -214,6 +214,7 @@
case INSERT:
case DELETE:
case FLUSH:
+ case UPSERT:
operationalComponents.add(memoryComponents.get(cmc));
break;
case SEARCH:
@@ -251,14 +252,6 @@
public void search(ILSMIndexOperationContext ictx, IIndexCursor cursor, ISearchPredicate pred)
throws HyracksDataException, IndexException {
LSMRTreeOpContext ctx = (LSMRTreeOpContext) ictx;
- //List<ILSMComponent> operationalComponents = ictx.getComponentHolder();
-
- /*
- LSMRTreeCursorInitialState initialState = new LSMRTreeCursorInitialState(rtreeLeafFrameFactory,
- rtreeInteriorFrameFactory, btreeLeafFrameFactory, ctx.getBTreeMultiComparator(), lsmHarness,
- comparatorFields, linearizerArray, ctx.searchCallback, operationalComponents);
- */
-
cursor.open(ctx.searchInitialState, pred);
}
@@ -353,8 +346,8 @@
indexTuple = tuple;
}
- ctx.modificationCallback.before(indexTuple);
- ctx.modificationCallback.found(null, indexTuple);
+ ctx.getModificationCallback().before(indexTuple);
+ ctx.getModificationCallback().found(null, indexTuple);
if (ctx.getOperation() == IndexOperation.INSERT) {
ctx.currentMutableRTreeAccessor.insert(indexTuple);
} else {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
index f839e15..3c91d62 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java
@@ -123,7 +123,6 @@
/**
* Opens LSMRTree, cleaning up invalid files from base dir, and registering
* all valid files as on-disk RTrees and BTrees.
- *
* @throws HyracksDataException
*/
@Override
@@ -493,8 +492,8 @@
indexTuple = tuple;
}
- ctx.modificationCallback.before(indexTuple);
- ctx.modificationCallback.found(null, indexTuple);
+ ctx.getModificationCallback().before(indexTuple);
+ ctx.getModificationCallback().found(null, indexTuple);
if (ctx.getOperation() == IndexOperation.INSERT) {
ctx.currentMutableRTreeAccessor.insert(indexTuple);
} else {
diff --git a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index 2238af0..96c3380 100644
--- a/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@ -55,8 +55,8 @@
public final List<ILSMComponent> componentHolder;
private final List<ILSMComponent> componentsToBeMerged;
private final List<ILSMComponent> componentsToBeReplicated;
- public final IModificationOperationCallback modificationCallback;
- public final ISearchOperationCallback searchCallback;
+ private IModificationOperationCallback modificationCallback;
+ private ISearchOperationCallback searchCallback;
public final PermutingTupleReference indexTuple;
public final MultiComparator filterCmp;
public final PermutingTupleReference filterTuple;
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
index 5d8ce2e..6370458 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeSearchCursor.java
@@ -218,7 +218,7 @@
pred = (SearchPredicate) searchPred;
cmp = pred.getLowKeyComparator();
- searchKey = pred.getSearchKey();
+ searchKey = pred.getLowKey();
if (searchKey != null) {
int maxFieldPos = cmp.getKeyFieldCount() / 2;
diff --git a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/SearchPredicate.java b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/SearchPredicate.java
index 643eb8e..da03608 100644
--- a/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/SearchPredicate.java
+++ b/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/SearchPredicate.java
@@ -42,7 +42,8 @@
this.cmp = cmp;
}
- public ITupleReference getSearchKey() {
+ @Override
+ public ITupleReference getLowKey() {
return searchKey;
}
@@ -50,10 +51,12 @@
this.searchKey = searchKey;
}
+ @Override
public MultiComparator getLowKeyComparator() {
return cmp;
}
+ @Override
public MultiComparator getHighKeyComparator() {
return cmp;
}
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
index eebbf66..a9b2e1f 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
@@ -18,11 +18,6 @@
*/
package org.apache.hyracks.storage.am.btree;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
@@ -32,6 +27,10 @@
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
public abstract class AbstractModificationOperationCallbackTest extends AbstractOperationCallbackTest {
@@ -48,11 +47,13 @@
this.isFoundNull = true;
}
+ @Override
@Before
public void setup() throws Exception {
super.setup();
}
+ @Override
@After
public void tearDown() throws Exception {
super.tearDown();
@@ -98,6 +99,10 @@
Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, after));
}
+ @Override
+ public void setOp(Operation op) throws HyracksDataException {
+ }
+
}
}
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
index 2094676..20a2c0f 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractSearchOperationCallbackTest.java
@@ -26,11 +26,6 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
@@ -41,6 +36,10 @@
import org.apache.hyracks.storage.am.common.api.IIndexCursor;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
public abstract class AbstractSearchOperationCallbackTest extends AbstractOperationCallbackTest {
private static final int NUM_TASKS = 2;
@@ -57,12 +56,14 @@
this.insertTaskStarted = false;
}
+ @Override
@Before
public void setup() throws Exception {
executor = Executors.newFixedThreadPool(NUM_TASKS);
super.setup();
}
+ @Override
@After
public void tearDown() throws Exception {
executor.shutdown();
@@ -192,7 +193,10 @@
@Override
public void complete(ITupleReference tuple) throws HyracksDataException {
+ }
+ @Override
+ public void before(ITupleReference tuple) throws HyracksDataException {
}
}
diff --git a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
index 8e6a11d..15d8a60 100644
--- a/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
+++ b/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
@@ -25,7 +25,7 @@
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
-public enum TestOperationCallback implements ISearchOperationCallback, IModificationOperationCallback {
+public enum TestOperationCallback implements ISearchOperationCallback,IModificationOperationCallback {
INSTANCE;
private static final int RANDOM_SEED = 50;
@@ -48,12 +48,12 @@
@Override
public void before(ITupleReference tuple) {
- // Do nothing.
+ // Do nothing.
}
@Override
public void found(ITupleReference before, ITupleReference after) {
- // Do nothing.
+ // Do nothing.
}
@Override
@@ -66,4 +66,9 @@
// Do nothing.
}
+ @Override
+ public void setOp(Operation op) throws HyracksDataException {
+ // Do nothing.
+ }
+
}
diff --git a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
index 6712d1c..20dfb9c 100644
--- a/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
+++ b/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeSearchOperationCallbackTest.java
@@ -22,9 +22,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
-import org.junit.Assert;
-import org.junit.Test;
-
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference;
@@ -41,6 +38,8 @@
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeTestHarness;
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
import org.apache.hyracks.storage.am.lsm.common.impls.NoOpOperationTrackerProvider;
+import org.junit.Assert;
+import org.junit.Test;
public class LSMBTreeSearchOperationCallbackTest extends AbstractSearchOperationCallbackTest {
private final LSMBTreeTestHarness harness;
@@ -207,6 +206,9 @@
expectedTupleToBeLockedValue);
}
+ @Override
+ public void before(ITupleReference tuple) throws HyracksDataException {
+ }
}
}