- Added Tokenize Operator in addition to the bulkload operator changes that were made by Zachary Heilbron.
The tokenize operator is only added to the logical plan when bulk-loading the data.
- Each secondary index is now updated in the separate branch by using the replicate operator.
- Sink Operator now accepts multiple inputs.
- Fixed the bulk-load so that it correctly produces auto-generated PK.
Change-Id: Ifb591754dba5eb4a9207edaa4e658f4cc745893a
Reviewed-on: http://fulliautomatix.ics.uci.edu:8443/78
Reviewed-by: Young-Seok Kim <kisskys@gmail.com>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index e15ffe0..bface33 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -17,7 +17,6 @@
public enum LogicalOperatorTag {
AGGREGATE,
ASSIGN,
- CLUSTER,
DATASOURCESCAN,
DISTINCT,
DISTRIBUTE_RESULT,
@@ -46,5 +45,6 @@
INDEX_INSERT_DELETE,
UPDATE,
EXTENSION_OPERATOR,
- EXTERNAL_LOOKUP
+ EXTERNAL_LOOKUP,
+ TOKENIZE
}
\ No newline at end of file
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index c41107b..48f8230 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -19,6 +19,8 @@
ASSIGN,
BROADCAST_EXCHANGE,
BTREE_SEARCH,
+ BULKLOAD,
+ INDEX_BULKLOAD,
STATS,
DATASOURCE_SCAN,
DISTRIBUTE_RESULT,
@@ -62,5 +64,6 @@
LENGTH_PARTITIONED_INVERTED_INDEX_SEARCH,
PARTITIONINGSPLIT,
EXTENSION_OPERATOR,
- EXTERNAL_LOOKUP
+ EXTERNAL_LOOKUP,
+ TOKENIZE
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 997053f..129201d 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -64,7 +64,7 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification jobSpec) throws AlgebricksException;
+ JobGenContext context, JobSpecification jobSpec, boolean bulkload) throws AlgebricksException;
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
@@ -74,7 +74,7 @@
/**
* Creates the insert runtime of IndexInsertDeletePOperator, which models
* insert/delete operations into a secondary index.
- *
+ *
* @param dataSource
* Target secondary index.
* @param propagatedSchema
@@ -108,12 +108,12 @@
IDataSourceIndex<I, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec) throws AlgebricksException;
+ JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException;
/**
* Creates the delete runtime of IndexInsertDeletePOperator, which models
* insert/delete operations into a secondary index.
- *
+ *
* @param dataSource
* Target secondary index.
* @param propagatedSchema
@@ -149,6 +149,42 @@
List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec) throws AlgebricksException;
+ /**
+ * Creates the TokenizeOperator for IndexInsertDeletePOperator, which tokenizes
+ * secondary key into [token, number of token] pair in a length-partitioned index.
+ * In case of non length-partitioned index, it tokenizes secondary key into [token].
+ *
+ * @param dataSource
+ * Target secondary index.
+ * @param propagatedSchema
+ * Output schema of the insert/delete operator to be created.
+ * @param inputSchemas
+ * Output schemas of the insert/delete operator to be created.
+ * @param typeEnv
+ * Type environment of the original IndexInsertDeleteOperator operator.
+ * @param primaryKeys
+ * Variables for the dataset's primary keys that the dataSource secondary index belongs to.
+ * @param secondaryKeys
+ * Variables for the secondary-index keys.
+ * @param filterExpr
+ * Filtering expression to be pushed inside the runtime op.
+ * Such a filter may, e.g., exclude NULLs from being inserted/deleted.
+ * @param recordDesc
+ * Output record descriptor of the runtime op to be created.
+ * @param context
+ * Job generation context.
+ * @param spec
+ * Target job specification.
+ * @return
+ * A Hyracks IOperatorDescriptor and its partition constraint.
+ * @throws AlgebricksException
+ */
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
+ IDataSourceIndex<I, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException;
+
public IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId) throws AlgebricksException;
public IFunctionInfo lookupFunction(FunctionIdentifier fid);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
index 78efd0d..c6e93fe 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/AbstractLogicalOperator.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
index ea45cc6..c58215e 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/IndexInsertDeleteOperator.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -24,6 +24,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
@@ -35,19 +36,24 @@
private final IDataSourceIndex<?, ?> dataSourceIndex;
private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+ // In the bulk-load case on ngram or keyword index,
+ // it contains [token, number of token] or [token].
+ // Otherwise, it contains secondary key information.
private final List<Mutable<ILogicalExpression>> secondaryKeyExprs;
private final Mutable<ILogicalExpression> filterExpr;
private final Kind operation;
+ private final boolean bulkload;
private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
public IndexInsertDeleteOperator(IDataSourceIndex<?, ?> dataSourceIndex,
List<Mutable<ILogicalExpression>> primaryKeyExprs, List<Mutable<ILogicalExpression>> secondaryKeyExprs,
- Mutable<ILogicalExpression> filterExpr, Kind operation) {
+ Mutable<ILogicalExpression> filterExpr, Kind operation, boolean bulkload) {
this.dataSourceIndex = dataSourceIndex;
this.primaryKeyExprs = primaryKeyExprs;
this.secondaryKeyExprs = secondaryKeyExprs;
this.filterExpr = filterExpr;
this.operation = operation;
+ this.bulkload = bulkload;
}
@Override
@@ -105,6 +111,10 @@
return dataSourceIndex;
}
+ public String getIndexName() {
+ return dataSourceIndex.getId().toString();
+ }
+
public List<Mutable<ILogicalExpression>> getSecondaryKeyExpressions() {
return secondaryKeyExprs;
}
@@ -117,6 +127,10 @@
return operation;
}
+ public boolean isBulkload() {
+ return bulkload;
+ }
+
public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
this.additionalFilteringExpressions = additionalFilteringExpressions;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
index 94fd197..134506c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/InsertDeleteOperator.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -38,17 +38,23 @@
}
private final IDataSource<?> dataSource;
+
private final Mutable<ILogicalExpression> payloadExpr;
+
private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+
private final Kind operation;
private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
- public InsertDeleteOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payload,
- List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation) {
+ private final boolean bulkload;
+
+ public InsertDeleteOperator(IDataSource<?> dataSource, Mutable<ILogicalExpression> payloadExpr,
+ List<Mutable<ILogicalExpression>> primaryKeyExprs, Kind operation, boolean bulkload) {
this.dataSource = dataSource;
- this.payloadExpr = payload;
+ this.payloadExpr = payloadExpr;
this.primaryKeyExprs = primaryKeyExprs;
this.operation = operation;
+ this.bulkload = bulkload;
}
@Override
@@ -58,15 +64,13 @@
}
@Override
- public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
- boolean b = false;
- b = visitor.transform(payloadExpr);
- for (int i = 0; i < primaryKeyExprs.size(); i++) {
- if (visitor.transform(primaryKeyExprs.get(i))) {
- b = true;
- }
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform) throws AlgebricksException {
+ boolean changed = false;
+ changed = transform.transform(payloadExpr);
+ for (Mutable<ILogicalExpression> e : primaryKeyExprs) {
+ changed |= transform.transform(e);
}
- return b;
+ return changed;
}
@Override
@@ -110,6 +114,10 @@
return operation;
}
+ public boolean isBulkload() {
+ return bulkload;
+ }
+
public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
this.additionalFilteringExpressions = additionalFilteringExpressions;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java
index bdad6d4..84ad33b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/SinkOperator.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,8 +15,10 @@
package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
import java.util.ArrayList;
+import java.util.List;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
@@ -24,6 +26,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypeEnvPointer;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.OpRefTypeEnvPointer;
import edu.uci.ics.hyracks.algebricks.core.algebra.typing.PropagatingTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
@@ -33,7 +36,14 @@
@Override
public void recomputeSchema() throws AlgebricksException {
- schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+ schema = new ArrayList<LogicalVariable>();
+ for (int i = 0; i < inputs.size(); i++) {
+ for (LogicalVariable v : inputs.get(i).getValue().getSchema()) {
+ if (!schema.contains(v))
+ schema.add(v);
+ }
+
+ }
}
@Override
@@ -58,11 +68,14 @@
@Override
public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
- ITypeEnvPointer[] envPointers = new ITypeEnvPointer[1];
- envPointers[0] = new OpRefTypeEnvPointer(inputs.get(0), ctx);
- PropagatingTypeEnvironment env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
- ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
- return env;
+ ITypeEnvPointer[] envPointers = new ITypeEnvPointer[inputs.size()];
+ for (int i = 0; i < inputs.size(); i++) {
+ envPointers[i] = new OpRefTypeEnvPointer(inputs.get(i), ctx);
+ }
+ PropagatingTypeEnvironment env = new PropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
+ ctx.getNullableTypeComputer(), ctx.getMetadataProvider(), TypePropagationPolicy.ALL, envPointers);
+ return env;
+
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
new file mode 100644
index 0000000..82c1035
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/TokenizeOperator.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class TokenizeOperator extends AbstractLogicalOperator {
+
+ private final IDataSourceIndex<?, ?> dataSourceIndex;
+ private final List<Mutable<ILogicalExpression>> primaryKeyExprs;
+ private final List<Mutable<ILogicalExpression>> secondaryKeyExprs;
+ // logical variables for token, and number of token
+ private final List<LogicalVariable> tokenizeVars;
+ private final Mutable<ILogicalExpression> filterExpr;
+ private final Kind operation;
+ private final boolean bulkload;
+ private final boolean isPartitioned;
+ // contains the type for each variable in the tokenizeVars
+ private final List<Object> tokenizeVarTypes;
+ private List<Mutable<ILogicalExpression>> additionalFilteringExpressions;
+
+ public TokenizeOperator(IDataSourceIndex<?, ?> dataSourceIndex,
+ List<Mutable<ILogicalExpression>> primaryKeyExprs,
+ List<Mutable<ILogicalExpression>> secondaryKeyExprs,
+ List<LogicalVariable> tokenizeVars,
+ Mutable<ILogicalExpression> filterExpr, Kind operation,
+ boolean bulkload, boolean isPartitioned,
+ List<Object> tokenizeVarTypes) {
+ this.dataSourceIndex = dataSourceIndex;
+ this.primaryKeyExprs = primaryKeyExprs;
+ this.secondaryKeyExprs = secondaryKeyExprs;
+ this.tokenizeVars = tokenizeVars;
+ this.filterExpr = filterExpr;
+ this.operation = operation;
+ this.bulkload = bulkload;
+ this.isPartitioned = isPartitioned;
+ this.tokenizeVarTypes = tokenizeVarTypes;
+ }
+
+ @Override
+ public void recomputeSchema() throws AlgebricksException {
+ schema = new ArrayList<LogicalVariable>();
+ schema.addAll(inputs.get(0).getValue().getSchema());
+ schema.addAll(tokenizeVars);
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(
+ ILogicalExpressionReferenceTransform visitor)
+ throws AlgebricksException {
+ boolean b = false;
+ for (int i = 0; i < primaryKeyExprs.size(); i++) {
+ if (visitor.transform(primaryKeyExprs.get(i))) {
+ b = true;
+ }
+ }
+ for (int i = 0; i < secondaryKeyExprs.size(); i++) {
+ if (visitor.transform(secondaryKeyExprs.get(i))) {
+ b = true;
+ }
+ }
+ return b;
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg)
+ throws AlgebricksException {
+ return visitor.visitTokenizeOperator(this, arg);
+ }
+
+ @Override
+ public boolean isMap() {
+ return false;
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return new VariablePropagationPolicy() {
+
+ @Override
+ public void propagateVariables(IOperatorSchema target,
+ IOperatorSchema... sources) throws AlgebricksException {
+ target.addAllVariables(sources[0]);
+ for (LogicalVariable v : tokenizeVars) {
+ target.addVariable(v);
+ }
+ }
+ };
+
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.TOKENIZE;
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(
+ ITypingContext ctx) throws AlgebricksException {
+ IVariableTypeEnvironment env = createPropagatingAllInputsTypeEnvironment(ctx);
+
+ // If the secondary index is not length-partitioned, create one new
+ // output variable - token.
+ // If the secondary index is length-partitioned, create two new output
+ // variables - token, number of token.
+ // The type of this variable will be the same as the type of the
+ // secondary key. If the secondary is list type, then the element type
+ // of the list.
+ // Along with PK, the tokenizer will generate [token, number of tokens,
+ // PK] pairs.
+
+ for (int i = 0; i < tokenizeVars.size(); i++) {
+ env.setVarType(tokenizeVars.get(i), tokenizeVarTypes.get(i));
+ }
+
+ return env;
+ }
+
+ public List<Mutable<ILogicalExpression>> getPrimaryKeyExpressions() {
+ return primaryKeyExprs;
+ }
+
+ public IDataSourceIndex<?, ?> getDataSourceIndex() {
+ return dataSourceIndex;
+ }
+
+ public List<Mutable<ILogicalExpression>> getSecondaryKeyExpressions() {
+ return secondaryKeyExprs;
+ }
+
+ public List<LogicalVariable> getTokenizeVars() {
+ return tokenizeVars;
+ }
+
+ public Mutable<ILogicalExpression> getFilterExpression() {
+ return filterExpr;
+ }
+
+ public Kind getOperation() {
+ return operation;
+ }
+
+ public boolean isBulkload() {
+ return bulkload;
+ }
+
+ public boolean isPartitioned() {
+ return isPartitioned;
+ }
+
+ public List<Object> getTokenizeVarTypes() {
+ return tokenizeVarTypes;
+ }
+
+ public void setAdditionalFilteringExpressions(List<Mutable<ILogicalExpression>> additionalFilteringExpressions) {
+ this.additionalFilteringExpressions = additionalFilteringExpressions;
+ }
+
+ public List<Mutable<ILogicalExpression>> getAdditionalFilteringExpressions() {
+ return additionalFilteringExpressions;
+ }
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index ae0346e..266ead8 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -67,6 +67,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -500,6 +501,13 @@
}
@Override
+ public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext ctx)
+ throws AlgebricksException {
+ setEmptyFDsEqClasses(op, ctx);
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, IOptimizationContext ctx) throws AlgebricksException {
setEmptyFDsEqClasses(op, ctx);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index b56eb8c..804c0e7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,6 +59,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -468,6 +469,19 @@
}
@Override
+ public Boolean visitTokenizeOperator(TokenizeOperator op, ILogicalOperator arg)
+ throws AlgebricksException {
+ AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+ if (aop.getOperatorTag() != LogicalOperatorTag.TOKENIZE)
+ return Boolean.FALSE;
+ TokenizeOperator tokenizeOpArg = (TokenizeOperator) copyAndSubstituteVar(op, arg);
+ boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), tokenizeOpArg.getSchema());
+ if (!op.getDataSourceIndex().equals(tokenizeOpArg.getDataSourceIndex()))
+ isomorphic = false;
+ return isomorphic;
+ }
+
+ @Override
public Boolean visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
return true;
}
@@ -786,7 +800,7 @@
List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
deepCopyExpressionRefs(newKeyExpressions, op.getAdditionalFilteringExpressions());
InsertDeleteOperator insertDeleteOp = new InsertDeleteOperator(op.getDataSource(),
- deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation());
+ deepCopyExpressionRef(op.getPayloadExpression()), newKeyExpressions, op.getOperation(), op.isBulkload());
insertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
return insertDeleteOp;
}
@@ -803,11 +817,33 @@
List<Mutable<ILogicalExpression>> newLSMComponentFilterExpressions = new ArrayList<Mutable<ILogicalExpression>>();
deepCopyExpressionRefs(newLSMComponentFilterExpressions, op.getAdditionalFilteringExpressions());
IndexInsertDeleteOperator indexInsertDeleteOp = new IndexInsertDeleteOperator(op.getDataSourceIndex(),
- newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression, op.getOperation());
+ newPrimaryKeyExpressions, newSecondaryKeyExpressions, newFilterExpression, op.getOperation(), op.isBulkload());
indexInsertDeleteOp.setAdditionalFilteringExpressions(newLSMComponentFilterExpressions);
return indexInsertDeleteOp;
+ }
+
+ @Override
+ public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg)
+ throws AlgebricksException {
+ List<Mutable<ILogicalExpression>> newPrimaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ deepCopyExpressionRefs(newPrimaryKeyExpressions, op.getPrimaryKeyExpressions());
+ List<Mutable<ILogicalExpression>> newSecondaryKeyExpressions = new ArrayList<Mutable<ILogicalExpression>>();
+ deepCopyExpressionRefs(newSecondaryKeyExpressions, op.getSecondaryKeyExpressions());
+ List<LogicalVariable> newTokenizeVars = new ArrayList<LogicalVariable>();
+ deepCopyVars(newTokenizeVars, op.getTokenizeVars());
+ Mutable<ILogicalExpression> newFilterExpression = new MutableObject<ILogicalExpression>(
+ ((AbstractLogicalExpression) op.getFilterExpression()).cloneExpression());
+ List<Object> newTokenizeVarTypes = new ArrayList<Object>();
+ deepCopyObjects(newTokenizeVarTypes, op.getTokenizeVarTypes());
+
+ TokenizeOperator tokenizeOp = new TokenizeOperator(op.getDataSourceIndex(),
+ newPrimaryKeyExpressions, newSecondaryKeyExpressions,
+ newTokenizeVars, newFilterExpression, op.getOperation(),
+ op.isBulkload(), op.isPartitioned(), newTokenizeVarTypes);
+ return tokenizeOp;
}
+
@Override
public ILogicalOperator visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
return new SinkOperator();
@@ -825,6 +861,18 @@
((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression());
}
+ private List<LogicalVariable> deepCopyVars(List<LogicalVariable> newVars, List<LogicalVariable> oldVars) {
+ for (LogicalVariable oldVar : oldVars)
+ newVars.add(oldVar);
+ return newVars;
+ }
+
+ private List<Object> deepCopyObjects(List<Object> newObjs, List<Object> oldObjs) {
+ for (Object oldObj : oldObjs)
+ newObjs.add(oldObj);
+ return newObjs;
+ }
+
private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderAndExpression(
List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index ef91b38..ea8e82b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -57,6 +57,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -251,6 +252,13 @@
}
@Override
+ public Void visitTokenizeOperator(TokenizeOperator op, ILogicalOperator arg)
+ throws AlgebricksException {
+ mapVariablesStandard(op, arg);
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 225edd6..388d9f9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -49,6 +49,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -260,6 +261,13 @@
}
@Override
+ public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext arg)
+ throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, IOptimizationContext arg) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 3b8ed56..bf184e9 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -52,6 +52,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -244,6 +245,12 @@
}
@Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ producedVariables.addAll(op.getTokenizeVars());
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index b8cc3b8..b31bc3b 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -53,6 +53,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -282,6 +283,12 @@
}
@Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ standardLayout(op);
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Void arg) throws AlgebricksException {
standardLayout(op);
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 729a996..b2d6022 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,6 +54,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -400,6 +401,19 @@
}
@Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+ e.getValue().substituteVar(pair.first, pair.second);
+ }
+ substVarTypes(op, pair);
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
return null;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 2bd9205..0209492 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -53,6 +53,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -357,6 +358,17 @@
}
@Override
+ public Void visitTokenizeOperator(TokenizeOperator op, Void arg) {
+ for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+ e.getValue().getUsedVariables(usedVariables);
+ }
+ return null;
+ }
+
+ @Override
public Void visitSinkOperator(SinkOperator op, Void arg) {
return null;
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 4407b75..66e7b98 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -70,6 +70,14 @@
return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
+ protected PhysicalRequirements emptyUnaryRequirements(int numberOfChildren) {
+ StructuralPropertiesVector[] req = new StructuralPropertiesVector[numberOfChildren];
+ for (int i = 0; i < numberOfChildren; i++) {
+ req[i] = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+ }
+ return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
@Override
public void disableJobGenBelowMe() {
this.disableJobGenBelow = true;
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
new file mode 100644
index 0000000..f8fdd46
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/BulkloadPOperator.java
@@ -0,0 +1,105 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class BulkloadPOperator extends AbstractPhysicalOperator {
+
+ private final LogicalVariable payload;
+ private final List<LogicalVariable> primaryKeys;
+ private final List<LogicalVariable> additionalFilteringKeys;
+ private final IDataSource<?> dataSource;
+
+ public BulkloadPOperator(LogicalVariable payload, List<LogicalVariable> keys,
+ List<LogicalVariable> additionalFilteringKeys, IDataSource<?> dataSource) {
+ this.payload = payload;
+ this.primaryKeys = keys;
+ this.additionalFilteringKeys = additionalFilteringKeys;
+ this.dataSource = dataSource;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.BULKLOAD;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<LogicalVariable> scanVariables = new ArrayList<>();
+ scanVariables.addAll(primaryKeys);
+ scanVariables.add(new LogicalVariable(-1));
+ IPhysicalPropertiesVector physicalProps = dataSource.getPropertiesProvider().computePropertiesVector(
+ scanVariables);
+ StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
+ physicalProps.getLocalProperties());
+ return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
+ IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ InsertDeleteOperator insertDeleteOp = (InsertDeleteOperator) op;
+ assert insertDeleteOp.getOperation() == Kind.INSERT;
+ assert insertDeleteOp.isBulkload();
+
+ IMetadataProvider mp = context.getMetadataProvider();
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ JobSpecification spec = builder.getJobSpec();
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getInsertRuntime(
+ dataSource, propagatedSchema, typeEnv, primaryKeys, payload, additionalFilteringKeys,
+ inputDesc, context, spec, true);
+ builder.contributeHyracksOperator(insertDeleteOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = insertDeleteOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, insertDeleteOp, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
new file mode 100644
index 0000000..e0d8d79
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexBulkloadPOperator.java
@@ -0,0 +1,126 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class IndexBulkloadPOperator extends AbstractPhysicalOperator {
+
+ private final List<LogicalVariable> primaryKeys;
+ private final List<LogicalVariable> secondaryKeys;
+ private final List<LogicalVariable> additionalFilteringKeys;
+ private final Mutable<ILogicalExpression> filterExpr;
+ private final IDataSourceIndex<?, ?> dataSourceIndex;
+
+ public IndexBulkloadPOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, Mutable<ILogicalExpression> filterExpr,
+ IDataSourceIndex<?, ?> dataSourceIndex) {
+ this.primaryKeys = primaryKeys;
+ this.secondaryKeys = secondaryKeys;
+ this.additionalFilteringKeys = additionalFilteringKeys;
+ this.filterExpr = filterExpr;
+ this.dataSourceIndex = dataSourceIndex;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.INDEX_BULKLOAD;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ List<LogicalVariable> scanVariables = new ArrayList<>();
+ scanVariables.addAll(primaryKeys);
+ scanVariables.add(new LogicalVariable(-1));
+ IPhysicalPropertiesVector physicalProps = dataSourceIndex.getDataSource().getPropertiesProvider()
+ .computePropertiesVector(scanVariables);
+ List<ILocalStructuralProperty> localProperties = new ArrayList<>();
+ // Data needs to be sorted based on the [token, number of token, PK]
+ // OR [token, PK] if the index is not partitioned
+ for (LogicalVariable skVar : secondaryKeys) {
+ localProperties.add(new LocalOrderProperty(new OrderColumn(skVar,
+ OrderKind.ASC)));
+ }
+ for (LogicalVariable pkVar : primaryKeys) {
+ localProperties.add(new LocalOrderProperty(new OrderColumn(pkVar,
+ OrderKind.ASC)));
+ }
+ StructuralPropertiesVector spv = new StructuralPropertiesVector(physicalProps.getPartitioningProperty(),
+ localProperties);
+ return new PhysicalRequirements(new IPhysicalPropertiesVector[] { spv },
+ IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ IndexInsertDeleteOperator indexInsertDeleteOp = (IndexInsertDeleteOperator) op;
+ assert indexInsertDeleteOp.getOperation() == Kind.INSERT;
+ assert indexInsertDeleteOp.isBulkload();
+
+ IMetadataProvider mp = context.getMetadataProvider();
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ JobSpecification spec = builder.getJobSpec();
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getIndexInsertRuntime(
+ dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys,
+ additionalFilteringKeys, null, inputDesc, context, spec, true);
+ builder.contributeHyracksOperator(indexInsertDeleteOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = indexInsertDeleteOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, indexInsertDeleteOp, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+
+
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
index b56d638..4bc4c06 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/IndexInsertDeletePOperator.java
@@ -108,7 +108,7 @@
IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(insertDeleteOp);
if (insertDeleteOp.getOperation() == Kind.INSERT) {
runtimeAndConstraints = mp.getIndexInsertRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
- primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
+ primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec, false);
} else {
runtimeAndConstraints = mp.getIndexDeleteRuntime(dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, inputDesc, context, spec);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
index d254151..993e8d7 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/InsertDeletePOperator.java
@@ -97,7 +97,7 @@
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = null;
if (insertDeleteOp.getOperation() == Kind.INSERT) {
runtimeAndConstraints = mp.getInsertRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
- additionalFilteringKeys, inputDesc, context, spec);
+ additionalFilteringKeys, inputDesc, context, spec, false);
} else {
runtimeAndConstraints = mp.getDeleteRuntime(dataSource, propagatedSchema, typeEnv, keys, payload,
additionalFilteringKeys, inputDesc, context, spec);
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
index aa24221..ba299ac 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/SinkPOperator.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -14,13 +14,21 @@
*/
package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
@@ -28,6 +36,9 @@
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.misc.SinkOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
public class SinkPOperator extends AbstractPhysicalOperator {
@@ -38,30 +49,45 @@
@Override
public boolean isMicroOperator() {
- return true;
+ return false;
}
@Override
public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
- deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ AbstractLogicalOperator op2;
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+ IPhysicalPropertiesVector childsProperties = null;
+
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ op2 = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+ childsProperties = op2.getPhysicalOperator().getDeliveredProperties();
+ if (childsProperties.getLocalProperties() != null) {
+ propsLocal.addAll(childsProperties.getLocalProperties());
+ }
+ }
+
+ deliveredProperties = new StructuralPropertiesVector(childsProperties.getPartitioningProperty(), propsLocal);
}
@Override
public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
IPhysicalPropertiesVector reqdByParent) {
- return emptyUnaryRequirements();
+ return emptyUnaryRequirements(op.getInputs().size());
}
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
- SinkRuntimeFactory runtime = new SinkRuntimeFactory();
- builder.contributeMicroOperator(op, runtime, recDesc);
- ILogicalOperator src = op.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src, 0, op, 0);
+
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+
+ SinkOperatorDescriptor opDesc = new SinkOperatorDescriptor(spec, op.getInputs().size());
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
+
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ builder.contributeGraphEdge(op.getInputs().get(i).getValue(), 0, op, i);
+ }
}
@Override
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
new file mode 100644
index 0000000..ef8cd0a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/operators/physical/TokenizePOperator.java
@@ -0,0 +1,93 @@
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator.Kind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class TokenizePOperator extends AbstractPhysicalOperator {
+
+ private final List<LogicalVariable> primaryKeys;
+ private final List<LogicalVariable> secondaryKeys;
+ private final IDataSourceIndex<?, ?> dataSourceIndex;
+
+ public TokenizePOperator(List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ IDataSourceIndex<?, ?> dataSourceIndex) {
+ this.primaryKeys = primaryKeys;
+ this.secondaryKeys = secondaryKeys;
+ this.dataSourceIndex = dataSourceIndex;
+ }
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.TOKENIZE;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent) {
+ return emptyUnaryRequirements();
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+ deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ TokenizeOperator tokenizeOp = (TokenizeOperator) op;
+ if (tokenizeOp.getOperation() != Kind.INSERT || !tokenizeOp.isBulkload()) {
+ throw new AlgebricksException("Tokenize Operator only works when bulk-loading data.");
+ }
+
+ IMetadataProvider mp = context.getMetadataProvider();
+ IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(op);
+ JobSpecification spec = builder.getJobSpec();
+ RecordDescriptor inputDesc = JobGenHelper.mkRecordDescriptor(
+ context.getTypeEnvironment(op.getInputs().get(0).getValue()), inputSchemas[0], context);
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getTokenizerRuntime(
+ dataSourceIndex, propagatedSchema, inputSchemas, typeEnv, primaryKeys, secondaryKeys, null, inputDesc,
+ context, spec, true);
+ builder.contributeHyracksOperator(tokenizeOp, runtimeAndConstraints.first);
+ builder.contributeAlgebricksPartitionConstraint(runtimeAndConstraints.first, runtimeAndConstraints.second);
+ ILogicalOperator src = tokenizeOp.getInputs().get(0).getValue();
+ builder.contributeGraphEdge(src, 0, tokenizeOp, 0);
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 1ca6d26..a118ee8 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,6 +51,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -59,7 +60,8 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisitor<String, Integer> {
+public class LogicalOperatorPrettyPrintVisitor implements
+ ILogicalOperatorVisitor<String, Integer> {
ILogicalExpressionVisitor<String, Integer> exprVisitor;
@@ -67,35 +69,42 @@
exprVisitor = new LogicalExpressionPrettyPrintVisitor();
}
- public LogicalOperatorPrettyPrintVisitor(ILogicalExpressionVisitor<String, Integer> exprVisitor) {
+ public LogicalOperatorPrettyPrintVisitor(
+ ILogicalExpressionVisitor<String, Integer> exprVisitor) {
this.exprVisitor = exprVisitor;
}
@Override
- public String visitAggregateOperator(AggregateOperator op, Integer indent) throws AlgebricksException {
+ public String visitAggregateOperator(AggregateOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("aggregate ").append(op.getVariables()).append(" <- ");
+ addIndent(buffer, indent).append("aggregate ")
+ .append(op.getVariables()).append(" <- ");
pprintExprList(op.getExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitRunningAggregateOperator(RunningAggregateOperator op, Integer indent) throws AlgebricksException {
+ public String visitRunningAggregateOperator(RunningAggregateOperator op,
+ Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("running-aggregate ").append(op.getVariables()).append(" <- ");
+ addIndent(buffer, indent).append("running-aggregate ")
+ .append(op.getVariables()).append(" <- ");
pprintExprList(op.getExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Integer indent) {
+ public String visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
+ Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("empty-tuple-source");
return buffer.toString();
}
@Override
- public String visitGroupByOperator(GroupByOperator op, Integer indent) throws AlgebricksException {
+ public String visitGroupByOperator(GroupByOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("group by (");
pprintVeList(buffer, op.getGroupByList(), indent);
@@ -107,7 +116,8 @@
}
@Override
- public String visitDistinctOperator(DistinctOperator op, Integer indent) throws AlgebricksException {
+ public String visitDistinctOperator(DistinctOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("distinct " + "(");
pprintExprList(op.getExpressions(), buffer, indent);
@@ -116,62 +126,75 @@
}
@Override
- public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent) throws AlgebricksException {
+ public String visitInnerJoinOperator(InnerJoinOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("join (").append(op.getCondition().getValue().accept(exprVisitor, indent))
- .append(")");
+ addIndent(buffer, indent)
+ .append("join (")
+ .append(op.getCondition().getValue()
+ .accept(exprVisitor, indent)).append(")");
return buffer.toString();
}
@Override
- public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Integer indent) throws AlgebricksException {
+ public String visitLeftOuterJoinOperator(LeftOuterJoinOperator op,
+ Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("left outer join (")
- .append(op.getCondition().getValue().accept(exprVisitor, indent)).append(")");
+ addIndent(buffer, indent)
+ .append("left outer join (")
+ .append(op.getCondition().getValue()
+ .accept(exprVisitor, indent)).append(")");
return buffer.toString();
}
@Override
- public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Integer indent) {
+ public String visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
+ Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("nested tuple source");
return buffer.toString();
}
@Override
- public String visitOrderOperator(OrderOperator op, Integer indent) throws AlgebricksException {
+ public String visitOrderOperator(OrderOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("order ");
- for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op.getOrderExpressions()) {
+ for (Pair<OrderOperator.IOrder, Mutable<ILogicalExpression>> p : op
+ .getOrderExpressions()) {
String fst;
switch (p.first.getKind()) {
- case ASC: {
- fst = "ASC";
- break;
- }
- case DESC: {
- fst = "DESC";
- break;
- }
- default: {
- fst = p.first.getExpressionRef().toString();
- }
+ case ASC: {
+ fst = "ASC";
+ break;
}
- buffer.append("(" + fst + ", " + p.second.getValue().accept(exprVisitor, indent) + ") ");
+ case DESC: {
+ fst = "DESC";
+ break;
+ }
+ default: {
+ fst = p.first.getExpressionRef().toString();
+ }
+ }
+ buffer.append("(" + fst + ", "
+ + p.second.getValue().accept(exprVisitor, indent) + ") ");
}
return buffer.toString();
}
@Override
- public String visitAssignOperator(AssignOperator op, Integer indent) throws AlgebricksException {
+ public String visitAssignOperator(AssignOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("assign ").append(op.getVariables()).append(" <- ");
+ addIndent(buffer, indent).append("assign ").append(op.getVariables())
+ .append(" <- ");
pprintExprList(op.getExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitWriteOperator(WriteOperator op, Integer indent) throws AlgebricksException {
+ public String visitWriteOperator(WriteOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("write ");
pprintExprList(op.getExpressions(), buffer, indent);
@@ -179,7 +202,8 @@
}
@Override
- public String visitDistributeResultOperator(DistributeResultOperator op, Integer indent) throws AlgebricksException {
+ public String visitDistributeResultOperator(DistributeResultOperator op,
+ Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("distribute result ");
pprintExprList(op.getExpressions(), buffer, indent);
@@ -187,32 +211,42 @@
}
@Override
- public String visitWriteResultOperator(WriteResultOperator op, Integer indent) throws AlgebricksException {
+ public String visitWriteResultOperator(WriteResultOperator op,
+ Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("load ").append(op.getDataSource()).append(" from ")
- .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
+ addIndent(buffer, indent)
+ .append("load ")
+ .append(op.getDataSource())
+ .append(" from ")
+ .append(op.getPayloadExpression().getValue()
+ .accept(exprVisitor, indent))
+ .append(" partitioned by ");
pprintExprList(op.getKeyExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitSelectOperator(SelectOperator op, Integer indent) throws AlgebricksException {
+ public String visitSelectOperator(SelectOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("select (").append(op.getCondition().getValue().accept(exprVisitor, indent))
- .append(")");
+ addIndent(buffer, indent)
+ .append("select (")
+ .append(op.getCondition().getValue()
+ .accept(exprVisitor, indent)).append(")");
return buffer.toString();
}
@Override
public String visitProjectOperator(ProjectOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("project " + "(" + op.getVariables() + ")");
+ addIndent(buffer, indent).append(
+ "project " + "(" + op.getVariables() + ")");
return buffer.toString();
}
@Override
- public String visitPartitioningSplitOperator(PartitioningSplitOperator op, Integer indent)
- throws AlgebricksException {
+ public String visitPartitioningSplitOperator(PartitioningSplitOperator op,
+ Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("partitioning-split (");
pprintExprList(op.getExpressions(), buffer, indent);
@@ -221,7 +255,8 @@
}
@Override
- public String visitSubplanOperator(SubplanOperator op, Integer indent) throws AlgebricksException {
+ public String visitSubplanOperator(SubplanOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("subplan {");
printNestedPlans(op, indent, buffer);
@@ -232,44 +267,58 @@
public String visitUnionOperator(UnionAllOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("union");
- for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op.getVariableMappings()) {
- buffer.append(" (" + v.first + ", " + v.second + ", " + v.third + ")");
+ for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> v : op
+ .getVariableMappings()) {
+ buffer.append(" (" + v.first + ", " + v.second + ", " + v.third
+ + ")");
}
return buffer.toString();
}
@Override
- public String visitUnnestOperator(UnnestOperator op, Integer indent) throws AlgebricksException {
+ public String visitUnnestOperator(UnnestOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("unnest " + op.getVariable());
if (op.getPositionalVariable() != null) {
buffer.append(" at " + op.getPositionalVariable());
}
- buffer.append(" <- " + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+ buffer.append(" <- "
+ + op.getExpressionRef().getValue().accept(exprVisitor, indent));
return buffer.toString();
}
@Override
- public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent) throws AlgebricksException {
+ public String visitUnnestMapOperator(UnnestMapOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(
- "unnest-map " + op.getVariables() + " <- "
- + op.getExpressionRef().getValue().accept(exprVisitor, indent));
+ "unnest-map "
+ + op.getVariables()
+ + " <- "
+ + op.getExpressionRef().getValue()
+ .accept(exprVisitor, indent));
return buffer.toString();
}
@Override
- public String visitDataScanOperator(DataSourceScanOperator op, Integer indent) {
+ public String visitDataScanOperator(DataSourceScanOperator op,
+ Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(
- "data-scan " + op.getProjectVariables() + "<-" + op.getVariables() + " <- " + op.getDataSource());
+ "data-scan " + op.getProjectVariables() + "<-"
+ + op.getVariables() + " <- " + op.getDataSource());
return buffer.toString();
}
@Override
- public String visitLimitOperator(LimitOperator op, Integer indent) throws AlgebricksException {
+ public String visitLimitOperator(LimitOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- addIndent(buffer, indent).append("limit " + op.getMaxObjects().getValue().accept(exprVisitor, indent));
+ addIndent(buffer, indent).append(
+ "limit "
+ + op.getMaxObjects().getValue()
+ .accept(exprVisitor, indent));
ILogicalExpression offset = op.getOffset().getValue();
if (offset != null) {
buffer.append(", " + offset.accept(exprVisitor, indent));
@@ -288,62 +337,91 @@
public String visitScriptOperator(ScriptOperator op, Integer indent) {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(
- "script (in: " + op.getInputVariables() + ") (out: " + op.getOutputVariables() + ")");
+ "script (in: " + op.getInputVariables() + ") (out: "
+ + op.getOutputVariables() + ")");
return buffer.toString();
}
@Override
- public String visitReplicateOperator(ReplicateOperator op, Integer indent) throws AlgebricksException {
+ public String visitReplicateOperator(ReplicateOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("replicate ");
return buffer.toString();
}
@Override
- public String visitInsertDeleteOperator(InsertDeleteOperator op, Integer indent) throws AlgebricksException {
+ public String visitInsertDeleteOperator(InsertDeleteOperator op,
+ Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
- addIndent(buffer, indent).append(header).append(op.getDataSource()).append(" from ")
- .append(op.getPayloadExpression().getValue().accept(exprVisitor, indent)).append(" partitioned by ");
+ String header = op.getOperation() == Kind.INSERT ? "insert into "
+ : "delete from ";
+ addIndent(buffer, indent)
+ .append(header)
+ .append(op.getDataSource())
+ .append(" from ")
+ .append(op.getPayloadExpression().getValue()
+ .accept(exprVisitor, indent))
+ .append(" partitioned by ");
pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
+ if (op.isBulkload()) {
+ buffer.append(" [bulkload]");
+ }
return buffer.toString();
}
@Override
- public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Integer indent)
+ public String visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op,
+ Integer indent) throws AlgebricksException {
+ StringBuilder buffer = new StringBuilder();
+ String header = op.getOperation() == Kind.INSERT ? "insert into "
+ : "delete from ";
+ addIndent(buffer, indent).append(header).append(op.getIndexName())
+ .append(" on ").append(op.getDataSourceIndex().getDataSource())
+ .append(" from ");
+ pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
+ if (op.isBulkload()) {
+ buffer.append(" [bulkload]");
+ }
+ return buffer.toString();
+ }
+
+ @Override
+ public String visitTokenizeOperator(TokenizeOperator op, Integer indent)
throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
- String header = op.getOperation() == Kind.INSERT ? "insert into " : "delete from ";
- addIndent(buffer, indent).append(header).append(op.getDataSourceIndex()).append(" from ");
+ addIndent(buffer, indent).append("tokenize ")
+ .append(op.getTokenizeVars()).append(" <- ");
pprintExprList(op.getSecondaryKeyExpressions(), buffer, indent);
- buffer.append(" ");
- pprintExprList(op.getPrimaryKeyExpressions(), buffer, indent);
return buffer.toString();
}
@Override
- public String visitSinkOperator(SinkOperator op, Integer indent) throws AlgebricksException {
+ public String visitSinkOperator(SinkOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("sink");
return buffer.toString();
}
@Override
- public String visitExtensionOperator(ExtensionOperator op, Integer indent) throws AlgebricksException {
+ public String visitExtensionOperator(ExtensionOperator op, Integer indent)
+ throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(op.toString());
return buffer.toString();
}
- protected static final StringBuilder addIndent(StringBuilder buffer, int level) {
+ protected static final StringBuilder addIndent(StringBuilder buffer,
+ int level) {
for (int i = 0; i < level; ++i) {
buffer.append(' ');
}
return buffer;
}
- protected void printNestedPlans(AbstractOperatorWithNestedPlans op, Integer indent, StringBuilder buffer)
- throws AlgebricksException {
+ protected void printNestedPlans(AbstractOperatorWithNestedPlans op,
+ Integer indent, StringBuilder buffer) throws AlgebricksException {
boolean first = true;
if (op.getNestedPlans().isEmpty()) {
buffer.append("}");
@@ -363,8 +441,9 @@
}
}
- protected void pprintExprList(List<Mutable<ILogicalExpression>> expressions, StringBuilder buffer, Integer indent)
- throws AlgebricksException {
+ protected void pprintExprList(
+ List<Mutable<ILogicalExpression>> expressions,
+ StringBuilder buffer, Integer indent) throws AlgebricksException {
buffer.append("[");
boolean first = true;
for (Mutable<ILogicalExpression> exprRef : expressions) {
@@ -378,7 +457,9 @@
buffer.append("]");
}
- protected void pprintVeList(StringBuilder sb, List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList,
+ protected void pprintVeList(
+ StringBuilder sb,
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> vePairList,
Integer indent) throws AlgebricksException {
sb.append("[");
boolean fst = true;
@@ -398,11 +479,13 @@
}
@Override
- public String visitExternalDataLookupOperator(ExternalDataLookupOperator op, Integer indent)
+ public String visitExternalDataLookupOperator(
+ ExternalDataLookupOperator op, Integer indent)
throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append(
- "external-instant-lookup " + op.getVariables() + " <- " + op.getExpressionRef().getValue());
+ "external-instant-lookup " + op.getVariables() + " <- "
+ + op.getExpressionRef().getValue());
return buffer.toString();
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 3fdee94..c753042 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -40,6 +40,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
@@ -105,7 +106,9 @@
public R visitInsertDeleteOperator(InsertDeleteOperator op, T tag) throws AlgebricksException;
public R visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, T tag) throws AlgebricksException;
-
+
public R visitExternalDataLookupOperator(ExternalDataLookupOperator op, T arg) throws AlgebricksException;
+ public R visitTokenizeOperator(TokenizeOperator op, T arg) throws AlgebricksException;
+
}
diff --git a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 1fe6b15..e1f897c 100644
--- a/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/algebricks/algebricks-core/src/main/java/edu/uci/ics/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
index 4b965f5..099520d 100644
--- a/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
+++ b/algebricks/algebricks-examples/piglet-example/src/main/java/edu/uci/ics/hyracks/algebricks/examples/piglet/metadata/PigletMetadataProvider.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -174,11 +174,15 @@
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
- IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema,
- IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
- ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
- throws AlgebricksException {
+ IDataSourceIndex<String, String> dataSource,
+ IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalNonKeyFields,
+ ILogicalExpression filterExpr,
+ RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification spec, boolean bulkload) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
@@ -195,6 +199,16 @@
}
@Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
+ IDataSourceIndex<String, String> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
return FN_MAP.get(fid);
}
@@ -203,7 +217,7 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<String> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+ JobGenContext context, JobSpecification jobSpec, boolean bulkload) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
diff --git a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 58cb2ef..bc7255b 100644
--- a/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/edu/uci/ics/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -47,14 +47,17 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AggregatePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.BulkloadPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.DistributeResultPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.EmptyTupleSourcePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeletePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InsertDeletePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
@@ -71,6 +74,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamSelectPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StringStreamingScriptPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SubplanPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.TokenizePOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.UnionAllPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.UnnestPOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.WriteResultPOperator;
@@ -288,8 +292,13 @@
additionalFilteringKeys = new ArrayList<LogicalVariable>();
getKeys(opLoad.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
- op.setPhysicalOperator(new InsertDeletePOperator(payload, keys, additionalFilteringKeys, opLoad
- .getDataSource()));
+ if (opLoad.isBulkload()) {
+ op.setPhysicalOperator(new BulkloadPOperator(payload, keys, additionalFilteringKeys, opLoad
+ .getDataSource()));
+ } else {
+ op.setPhysicalOperator(new InsertDeletePOperator(payload, keys, additionalFilteringKeys, opLoad
+ .getDataSource()));
+ }
break;
}
case INDEX_INSERT_DELETE: {
@@ -303,8 +312,28 @@
additionalFilteringKeys = new ArrayList<LogicalVariable>();
getKeys(opInsDel.getAdditionalFilteringExpressions(), additionalFilteringKeys);
}
- op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
- additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+ if (opInsDel.isBulkload()) {
+ op.setPhysicalOperator(new IndexBulkloadPOperator(primaryKeys, secondaryKeys,
+ additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+ } else {
+ op.setPhysicalOperator(new IndexInsertDeletePOperator(primaryKeys, secondaryKeys,
+ additionalFilteringKeys, opInsDel.getFilterExpression(), opInsDel.getDataSourceIndex()));
+ }
+
+ break;
+
+ }
+ case TOKENIZE: {
+ TokenizeOperator opTokenize = (TokenizeOperator) op;
+ List<LogicalVariable> primaryKeys = new ArrayList<LogicalVariable>();
+ List<LogicalVariable> secondaryKeys = new ArrayList<LogicalVariable>();
+ getKeys(opTokenize.getPrimaryKeyExpressions(), primaryKeys);
+ getKeys(opTokenize.getSecondaryKeyExpressions(), secondaryKeys);
+ // Tokenize Operator only operates with a bulk load on a data set with an index
+ if (opTokenize.isBulkload()) {
+ op.setPhysicalOperator(new TokenizePOperator(primaryKeys, secondaryKeys, opTokenize
+ .getDataSourceIndex()));
+ }
break;
}
case SINK: {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
deleted file mode 100644
index 62b7c96..0000000
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/context/AsterixBTreeRegistry.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2009-2013 by The Regents of the University of California
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * you may obtain a copy of the License from
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.uci.ics.hyracks.algebricks.runtime.context;
-
-import java.util.HashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
-
-public class AsterixBTreeRegistry {
-
- private HashMap<Integer, BTree> map = new HashMap<Integer, BTree>();
- private Lock registryLock = new ReentrantLock();
-
- public BTree get(int fileId) {
- return map.get(fileId);
- }
-
- // TODO: not very high concurrency, but good enough for now
- public void lock() {
- registryLock.lock();
- }
-
- public void unlock() {
- registryLock.unlock();
- }
-
- public void register(int fileId, BTree btree) {
- map.put(fileId, btree);
- }
-
- public void unregister(int fileId) {
- map.remove(fileId);
- }
-}
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index aaa6862..81052d6 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -33,13 +33,17 @@
@Override
public void close() throws HyracksDataException {
+ flushIfNotFailed();
+ writer.close();
+ appender.reset(frame, true);
+ }
+
+ protected void flushIfNotFailed() throws HyracksDataException {
if (!failed) {
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(frame, writer);
}
}
- writer.close();
- appender.reset(frame, true);
}
protected void appendToFrameFromTupleBuilder(ArrayTupleBuilder tb) throws HyracksDataException {
diff --git a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
index 3e87f31..c0ae029 100644
--- a/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
+++ b/algebricks/algebricks-runtime/src/main/java/edu/uci/ics/hyracks/algebricks/runtime/operators/std/StreamProjectRuntimeFactory.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -71,12 +71,13 @@
}
}
if (flushFramesRapidly) {
- // Whenever all the tuples in the incoming frame have been consumed, the project operator
- // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
+ // Whenever all the tuples in the incoming frame have been consumed, the project operator
+ // will push its frame to the next operator; i.e., it won't wait until the frame gets full.
appendProjectionToFrame(t, projectionList, true);
} else {
appendProjectionToFrame(t, projectionList);
}
+
}
};
diff --git a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
index 8c72a3e..42356a5 100644
--- a/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
+++ b/hivesterix/hivesterix-runtime/src/main/java/edu/uci/ics/hivesterix/runtime/jobgen/HiveMetaDataProvider.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -95,7 +95,7 @@
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification jobSpec) throws AlgebricksException {
+ JobGenContext context, JobSpecification jobSpec, boolean bulkload) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
@@ -135,7 +135,7 @@
IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
- JobGenContext context, JobSpecification spec) throws AlgebricksException {
+ JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
}
@@ -149,4 +149,14 @@
// TODO Auto-generated method stub
return null;
}
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
+ IDataSourceIndex<T, S> dataSource, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
+ ILogicalExpression filterExpr, RecordDescriptor recordDesc,
+ JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
index ac44c11..6edd647 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,7 +32,7 @@
* the offset of the (i + 1)^th tuple. Every tuple is organized as a sequence of
* ints indicating the end of each field in the tuple relative to the end of the
* field slots.
- *
+ *
* @author vinayakb
*/
public final class FrameTupleAccessor implements IFrameTupleAccessor {
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java
index ad0cf93..acacd6a 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/edu/uci/ics/hyracks/dataflow/common/data/marshalling/ShortSerializerDeserializer.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
index d3d4833..ba8cfd6 100644
--- a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/file/DelimitedDataTupleParserFactory.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -82,6 +82,7 @@
throw new HyracksDataException(e);
}
}
+
};
}
diff --git a/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
new file mode 100644
index 0000000..fa0d874
--- /dev/null
+++ b/hyracks/hyracks-dataflow-std/src/main/java/edu/uci/ics/hyracks/dataflow/std/misc/SinkOperatorDescriptor.java
@@ -0,0 +1,41 @@
+package edu.uci.ics.hyracks.dataflow.std.misc;
+
+import java.nio.ByteBuffer;
+
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
+import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+
+public class SinkOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 1L;
+
+ public SinkOperatorDescriptor(IOperatorDescriptorRegistry spec, int nInput) {
+ super(spec, nInput, 0);
+ }
+
+ @Override
+ public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ return new AbstractUnaryInputSinkOperatorNodePushable() {
+ @Override
+ public void open() throws HyracksDataException {
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
index 1c1901d..3ea0ad9 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/PrimaryIndexBulkLoadExample.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -38,6 +38,7 @@
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.examples.btree.helper.DataGenOperatorDescriptor;
import edu.uci.ics.hyracks.examples.btree.helper.IndexLifecycleManagerProvider;
@@ -46,12 +47,11 @@
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
// This example will load a primary index from randomly generated data
-public class PrimaryIndexBulkLoadExample {
+public class PrimaryIndexBulkLoadExample {
private static class Options {
@Option(name = "-host", usage = "Hyracks Cluster Controller Host name", required = true)
public String host;
@@ -70,7 +70,7 @@
@Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
public int sbSize = 32768;
-
+
@Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
public int frameSize = 32768;
}
@@ -150,10 +150,10 @@
// etc.
IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.btreeName);
IIndexDataflowHelperFactory dataflowHelperFactory = new BTreeDataflowHelperFactory();
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, recDesc,
storageManager, lcManagerProvider, btreeSplitProvider, typeTraits, comparatorFactories, null,
- fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory);
+
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
// distribute the records from the datagen via hashing to the bulk load
@@ -162,12 +162,13 @@
hashFactories[0] = PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY);
IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(new int[] { 0 }, hashFactories));
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+ JobHelper.createPartitionConstraint(spec, nsOpDesc, splitNCs);
spec.connect(hashConn, dataGen, 0, sorter, 0);
-
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
-
- spec.addRoot(btreeBulkLoad);
+ spec.connect(new OneToOneConnectorDescriptor(spec), btreeBulkLoad, 0, nsOpDesc, 0);
+ spec.addRoot(nsOpDesc);
return spec;
}
diff --git a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
index 4826808..62d0487 100644
--- a/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
+++ b/hyracks/hyracks-examples/btree-example/btreeclient/src/main/java/edu/uci/ics/hyracks/examples/btree/client/SecondaryIndexBulkLoadExample.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -32,6 +32,7 @@
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.examples.btree.helper.IndexLifecycleManagerProvider;
import edu.uci.ics.hyracks.examples.btree.helper.StorageManagerInterface;
@@ -65,7 +66,7 @@
@Option(name = "-sortbuffer-size", usage = "Sort buffer size in frames (default: 32768)", required = false)
public int sbSize = 32768;
-
+
@Option(name = "-frame-size", usage = "Hyracks frame size (default: 32768)", required = false)
public int frameSize = 32768;
}
@@ -142,19 +143,18 @@
// tuple
int[] fieldPermutation = { 1, 0 };
IFileSplitProvider btreeSplitProvider = JobHelper.createFileSplitProvider(splitNCs, options.secondaryBTreeName);
- TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec, null,
storageManager, lcManagerProvider, btreeSplitProvider, secondaryTypeTraits, comparatorFactories, null,
- fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, 0.7f, false, 1000L, true, dataflowHelperFactory);
JobHelper.createPartitionConstraint(spec, btreeBulkLoad, splitNCs);
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+ JobHelper.createPartitionConstraint(spec, nsOpDesc, splitNCs);
// connect the ops
-
spec.connect(new OneToOneConnectorDescriptor(spec), btreeScanOp, 0, sorter, 0);
-
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
-
- spec.addRoot(btreeBulkLoad);
+ spec.connect(new OneToOneConnectorDescriptor(spec), btreeBulkLoad, 0, nsOpDesc, 0);
+ spec.addRoot(nsOpDesc);
return spec;
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index 19b749a..1eb0ee9 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -177,16 +177,19 @@
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7 };
TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
- primaryBloomFilterKeyFields, fieldPermutation, 0.7f, true, 1000L, true, dataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE);
+ primaryRecDesc, storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, primaryBloomFilterKeyFields, fieldPermutation, 0.7f, true, 1000L, true,
+ dataflowHelperFactory);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
+
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
-
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, primaryBtreeBulkLoad, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeBulkLoad, 0, nsOpDesc, 0);
- spec.addRoot(primaryBtreeBulkLoad);
+ spec.addRoot(nsOpDesc);
runTest(spec);
}
@@ -240,16 +243,20 @@
// load secondary index
int[] fieldPermutation = { 3, 0 };
TreeIndexBulkLoadOperatorDescriptor secondaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
+ secondaryRecDesc, storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
secondaryComparatorFactories, secondaryBloomFilterKeyFields, fieldPermutation, 0.7f, true, 1000L, true,
- dataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ dataflowHelperFactory);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBtreeBulkLoad, NC1_ID);
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
+
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryBtreeSearchOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), primaryBtreeSearchOp, 0, sorter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, secondaryBtreeBulkLoad, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBtreeBulkLoad, 0, nsOpDesc, 0);
- spec.addRoot(secondaryBtreeBulkLoad);
+ spec.addRoot(nsOpDesc);
runTest(spec);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
index cfe6db4..2aa037d 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/AbstractfWordInvertedIndexTest.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -51,6 +51,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -208,9 +209,8 @@
private IOperatorDescriptor createPrimaryBulkLoadOp(JobSpecification spec) {
int[] fieldPermutation = { 0, 1 };
TreeIndexBulkLoadOperatorDescriptor primaryBtreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
- primaryComparatorFactories, null, fieldPermutation, 0.7f, true, 1000L, true,
- btreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ primaryRecDesc, storageManager, lcManagerProvider, primaryFileSplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, null, fieldPermutation, 0.7f, true, 1000L, true, btreeDataflowHelperFactory);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBtreeBulkLoad, NC1_ID);
return primaryBtreeBulkLoad;
}
@@ -249,8 +249,10 @@
// before bulk load.
IOperatorDescriptor fileScanOp = createFileScanOp(spec);
IOperatorDescriptor primaryBulkLoad = createPrimaryBulkLoadOp(spec);
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
spec.connect(new OneToOneConnectorDescriptor(spec), fileScanOp, 0, primaryBulkLoad, 0);
- spec.addRoot(primaryBulkLoad);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryBulkLoad, 0, nsOpDesc, 0);
+ spec.addRoot(nsOpDesc);
runTest(spec);
}
@@ -277,16 +279,16 @@
private IOperatorDescriptor createBinaryTokenizerOp(JobSpecification spec, int docField, int[] keyFields) {
BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
- tokenizerRecDesc, tokenizerFactory, docField, keyFields, addNumTokensKey());
+ tokenizerRecDesc, tokenizerFactory, docField, keyFields, addNumTokensKey(), false);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
return binaryTokenizer;
}
private IOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec, int[] fieldPermutation) {
LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
- spec, fieldPermutation, true, 1000L, true, storageManager, btreeFileSplitProvider, lcManagerProvider,
- tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
- tokenizerFactory, invertedIndexDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ spec, tokenizerRecDesc, fieldPermutation, true, 1000L, true, storageManager, btreeFileSplitProvider,
+ lcManagerProvider, tokenTypeTraits, tokenComparatorFactories, invListsTypeTraits,
+ invListsComparatorFactories, tokenizerFactory, invertedIndexDataflowHelperFactory);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, invIndexBulkLoadOp, NC1_ID);
return invIndexBulkLoadOp;
}
@@ -306,11 +308,14 @@
}
IOperatorDescriptor externalSortOp = createExternalSortOp(spec, sortFields, tokenizerRecDesc);
IOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec, fieldPermutation);
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, binaryTokenizerOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), binaryTokenizerOp, 0, externalSortOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), externalSortOp, 0, invIndexBulkLoadOp, 0);
- spec.addRoot(invIndexBulkLoadOp);
+ spec.connect(new OneToOneConnectorDescriptor(spec), invIndexBulkLoadOp, 0, nsOpDesc, 0);
+ spec.addRoot(nsOpDesc);
runTest(spec);
}
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
index ba00691..268c3cb 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/invertedindex/BinaryTokenizerOperatorTest.java
@@ -85,7 +85,7 @@
tokenFactory);
int[] keyFields = { 0 };
BinaryTokenizerOperatorDescriptor binaryTokenizer = new BinaryTokenizerOperatorDescriptor(spec,
- tokenizerRecDesc, tokenizerFactory, 1, keyFields, addNumTokensKey);
+ tokenizerRecDesc, tokenizerFactory, 1, keyFields, addNumTokensKey, false);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, binaryTokenizer, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new FileSplit[] { new FileSplit(NC1_ID,
diff --git a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 2bdf652..8a335b5 100644
--- a/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++ b/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/edu/uci/ics/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -241,16 +241,21 @@
int[] fieldPermutation = { 0, 1, 2, 4, 5, 7, 9, 10, 11, 12 };
TreeIndexBulkLoadOperatorDescriptor primaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits, primaryComparatorFactories,
- null, fieldPermutation, 0.7f, false, 1000L, true, btreeDataflowHelperFactory,
- NoOpOperationCallbackFactory.INSTANCE);
+ primaryRecDesc, storageManager, lcManagerProvider, primarySplitProvider, primaryTypeTraits,
+ primaryComparatorFactories, null, fieldPermutation, 0.7f, false, 1000L, true,
+ btreeDataflowHelperFactory);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, primaryBulkLoad, NC1_ID);
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
+
spec.connect(new OneToOneConnectorDescriptor(spec), ordScanner, 0, sorter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, primaryBulkLoad, 0);
- spec.addRoot(primaryBulkLoad);
+ spec.connect(new OneToOneConnectorDescriptor(spec), primaryBulkLoad, 0, nsOpDesc, 0);
+
+ spec.addRoot(nsOpDesc);
runTest(spec);
}
@@ -298,15 +303,19 @@
// load secondary index
int[] fieldPermutation = { 6, 7, 8, 9, 0 };
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
+ secondaryRecDesc, storageManager, lcManagerProvider, secondarySplitProvider, secondaryTypeTraits,
secondaryComparatorFactories, null, fieldPermutation, 0.7f, false, 1000L, true,
- rtreeDataflowHelperFactory, NoOpOperationCallbackFactory.INSTANCE);
+ rtreeDataflowHelperFactory);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, secondaryBulkLoad, NC1_ID);
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID);
+
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primarySearchOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), primarySearchOp, 0, secondaryBulkLoad, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), secondaryBulkLoad, 0, nsOpDesc, 0);
- spec.addRoot(secondaryBulkLoad);
+ spec.addRoot(nsOpDesc);
runTest(spec);
}
diff --git a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
index 45b24c5..e46e685 100644
--- a/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
+++ b/hyracks/hyracks-examples/text-example/texthelper/src/main/java/edu/uci/ics/hyracks/examples/text/WordTupleParserFactory.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
index 81c13a1..9f3c5dd 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/IndexBulkLoadOperatorNodePushable.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,14 +21,16 @@
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexBulkLoader;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
-public class IndexBulkLoadOperatorNodePushable extends AbstractUnaryInputSinkOperatorNodePushable {
+public class IndexBulkLoadOperatorNodePushable extends
+ AbstractUnaryInputUnaryOutputOperatorNodePushable {
protected final IIndexOperatorDescriptor opDesc;
protected final IHyracksTaskContext ctx;
protected final float fillFactor;
@@ -42,46 +44,57 @@
protected IRecordDescriptorProvider recDescProvider;
protected PermutingFrameTupleReference tuple = new PermutingFrameTupleReference();
- public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition,
- int[] fieldPermutation, float fillFactor, boolean verifyInput, long numElementsHint,
- boolean checkIfEmptyIndex, IRecordDescriptorProvider recordDescProvider) {
+ public IndexBulkLoadOperatorNodePushable(IIndexOperatorDescriptor opDesc,
+ IHyracksTaskContext ctx, int partition, int[] fieldPermutation,
+ float fillFactor, boolean verifyInput, long numElementsHint,
+ boolean checkIfEmptyIndex,
+ IRecordDescriptorProvider recordDescProvider) {
this.opDesc = opDesc;
this.ctx = ctx;
- this.indexHelper = opDesc.getIndexDataflowHelperFactory().createIndexDataflowHelper(opDesc, ctx, partition);
+ this.indexHelper = opDesc.getIndexDataflowHelperFactory()
+ .createIndexDataflowHelper(opDesc, ctx, partition);
this.fillFactor = fillFactor;
this.verifyInput = verifyInput;
this.numElementsHint = numElementsHint;
this.checkIfEmptyIndex = checkIfEmptyIndex;
this.recDescProvider = recordDescProvider;
tuple.setFieldPermutation(fieldPermutation);
+
}
@Override
public void open() throws HyracksDataException {
- RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
+ RecordDescriptor recDesc = recDescProvider.getInputRecordDescriptor(
+ opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(ctx.getFrameSize(), recDesc);
indexHelper.open();
index = indexHelper.getIndexInstance();
try {
- bulkLoader = index.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
+ bulkLoader = index.createBulkLoader(fillFactor, verifyInput,
+ numElementsHint, checkIfEmptyIndex);
} catch (Exception e) {
indexHelper.close();
throw new HyracksDataException(e);
}
+ writer.open();
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
+
for (int i = 0; i < tupleCount; i++) {
tuple.reset(accessor, i);
+
try {
bulkLoader.add(tuple);
} catch (IndexException e) {
throw new HyracksDataException(e);
}
}
+ FrameUtils.flushFrame(buffer, writer);
+
}
@Override
@@ -93,6 +106,7 @@
} finally {
indexHelper.close();
}
+ writer.close();
}
@Override
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
index e8db013..92e178a 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/dataflow/TreeIndexBulkLoadOperatorDescriptor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +20,10 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
import edu.uci.ics.hyracks.storage.common.file.NoOpLocalResourceFactoryProvider;
@@ -38,17 +38,16 @@
private final long numElementsHint;
private final boolean checkIfEmptyIndex;
- public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ public TreeIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
IStorageManagerInterface storageManager, IIndexLifecycleManagerProvider lifecycleManagerProvider,
IFileSplitProvider fileSplitProvider, ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] comparatorFactories, int[] bloomFilterKeyFields, int[] fieldPermutation,
float fillFactor, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
- IIndexDataflowHelperFactory dataflowHelperFactory,
- IModificationOperationCallbackFactory modificationOpCallbackFactory) {
- super(spec, 1, 0, null, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
- comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false,
- false, null,
- NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
+ IIndexDataflowHelperFactory dataflowHelperFactory) {
+ super(spec, 1, 1, recDesc, storageManager, lifecycleManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, bloomFilterKeyFields, dataflowHelperFactory, null, false, false, null,
+ NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.fieldPermutation = fieldPermutation;
this.fillFactor = fillFactor;
this.verifyInput = verifyInput;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
index 71e2b9f..d23d484 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/frames/LIFOMetaDataFrame.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -37,10 +37,10 @@
protected static final int nextPageOff = levelOff + 1; // 21
protected static final int validOff = nextPageOff + 4; // 25
- // The additionalFilteringPageOff is used only for LSM indexes.
+ // The additionalFilteringPageOff is used only for LSM indexes.
// We store the page id that will be used to store the information of the the filter that is associated with a disk component.
// It is only set in the first meta page other meta pages (i.e., with level -2) have junk in the max page field.
- private static final int additionalFilteringPageOff = validOff + 4; // 29
+ private static final int additionalFilteringPageOff = validOff + 4; // 29
protected static final int lsnOff = additionalFilteringPageOff + 4; // 33
protected ICachedPage page = null;
diff --git a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractSearchPredicate.java b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractSearchPredicate.java
index 5a60b09..c33eaab 100644
--- a/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractSearchPredicate.java
+++ b/hyracks/hyracks-storage-am-common/src/main/java/edu/uci/ics/hyracks/storage/am/common/impls/AbstractSearchPredicate.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/frames/LSMComponentFilterFrame.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/frames/LSMComponentFilterFrame.java
index e2b5584..85c293d 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/frames/LSMComponentFilterFrame.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/frames/LSMComponentFilterFrame.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,7 +27,7 @@
// This page consists of two tuples that represents the minimum and maximum tuples in an LSM component.
- // A-one byte to indicate whether the filter tuples were set yet.
+ // A-one byte to indicate whether the filter tuples were set yet.
private static final int minTupleIsSetIndicatorOff = 0;
private static final int maxTupleIsSetIndicatorOff = 1;
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
index e2c31aa..682338c 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/AbstractLSMComponent.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -27,7 +27,7 @@
this.filter = filter;
readerCount = 0;
}
-
+
public AbstractLSMComponent() {
this(null);
}
diff --git a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
index bd54769..7ce8907 100644
--- a/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
+++ b/hyracks/hyracks-storage-am-lsm-common/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -54,7 +54,7 @@
int fileId = treeIndex.getFileId();
ITreeIndexMetaDataFrame metadataFrame = treeIndex.getFreePageManager().getMetaDataFrameFactory().createFrame();
- // Read the filter page from the first metadata page of the tree.
+ // Read the filter page from the first metadata page of the tree.
// If it is has not been created yet, then create a new one.
int metadataPageId = treeIndex.getFreePageManager().getFirstMetadataPage();
ICachedPage metadataPage = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, metadataPageId), false);
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
index 6ccfed3..648c51a 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorDescriptor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -38,15 +38,20 @@
// Indicates whether the first key field should be the number of tokens in the tokenized set of the document.
// This value is used in partitioned inverted indexes, for example.
private final boolean addNumTokensKey;
+ // Indicates the order of field write
+ // True: [keyfield1, ... n , token, number of token (if a partitioned index)]
+ // False: [token, number of token(if a partitioned index), keyfield1, keyfield2 ...]
+ private final boolean writeKeyFieldsFirst;
public BinaryTokenizerOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
- IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey) {
+ IBinaryTokenizerFactory tokenizerFactory, int docField, int[] keyFields, boolean addNumTokensKey, boolean writeKeyFieldsFirst) {
super(spec, 1, 1);
this.tokenizerFactory = tokenizerFactory;
this.docField = docField;
this.keyFields = keyFields;
this.addNumTokensKey = addNumTokensKey;
recordDescriptors[0] = recDesc;
+ this.writeKeyFieldsFirst = writeKeyFieldsFirst;
}
@Override
@@ -54,6 +59,6 @@
IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new BinaryTokenizerOperatorNodePushable(ctx, recordDescProvider.getInputRecordDescriptor(
getActivityId(), 0), recordDescriptors[0], tokenizerFactory.createTokenizer(), docField, keyFields,
- addNumTokensKey);
+ addNumTokensKey, writeKeyFieldsFirst);
}
}
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
index ad2022d..8598b70 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/BinaryTokenizerOperatorNodePushable.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,9 +15,11 @@
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow;
+import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import edu.uci.ics.hyracks.api.comm.FrameHelper;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
@@ -25,18 +27,22 @@
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
+import edu.uci.ics.hyracks.dataflow.common.util.IntSerDeUtils;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IToken;
-public class BinaryTokenizerOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
+public class BinaryTokenizerOperatorNodePushable extends
+ AbstractUnaryInputUnaryOutputOperatorNodePushable {
private final IHyracksTaskContext ctx;
private final IBinaryTokenizer tokenizer;
private final int docField;
private final int[] keyFields;
private final boolean addNumTokensKey;
+ private final boolean writeKeyFieldsFirst;
private final RecordDescriptor inputRecDesc;
private final RecordDescriptor outputRecDesc;
@@ -46,9 +52,10 @@
private FrameTupleAppender appender;
private ByteBuffer writeBuffer;
- public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecDesc,
- RecordDescriptor outputRecDesc, IBinaryTokenizer tokenizer, int docField, int[] keyFields,
- boolean addNumTokensKey) {
+ public BinaryTokenizerOperatorNodePushable(IHyracksTaskContext ctx,
+ RecordDescriptor inputRecDesc, RecordDescriptor outputRecDesc,
+ IBinaryTokenizer tokenizer, int docField, int[] keyFields,
+ boolean addNumTokensKey, boolean writeKeyFieldsFirst) {
this.ctx = ctx;
this.tokenizer = tokenizer;
this.docField = docField;
@@ -56,6 +63,7 @@
this.addNumTokensKey = addNumTokensKey;
this.inputRecDesc = inputRecDesc;
this.outputRecDesc = outputRecDesc;
+ this.writeKeyFieldsFirst = writeKeyFieldsFirst;
}
@Override
@@ -73,14 +81,17 @@
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
+
for (int i = 0; i < tupleCount; i++) {
short numTokens = 0;
if (addNumTokensKey) {
// Run through the tokens to get the total number of tokens.
tokenizer.reset(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
+ accessor.getTupleStartOffset(i)
+ + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(i, docField),
+ accessor.getFieldLength(i, docField));
while (tokenizer.hasNext()) {
tokenizer.next();
numTokens++;
@@ -89,38 +100,81 @@
tokenizer.reset(
accessor.getBuffer().array(),
- accessor.getTupleStartOffset(i) + accessor.getFieldSlotsLength()
- + accessor.getFieldStartOffset(i, docField), accessor.getFieldLength(i, docField));
+ accessor.getTupleStartOffset(i)
+ + accessor.getFieldSlotsLength()
+ + accessor.getFieldStartOffset(i, docField),
+ accessor.getFieldLength(i, docField));
+
+ // Write token and data into frame by following the order specified
+ // in the writeKeyFieldsFirst field.
while (tokenizer.hasNext()) {
+
tokenizer.next();
builder.reset();
- try {
- IToken token = tokenizer.getToken();
- token.serializeToken(builderData);
- builder.addFieldEndOffset();
- // Add number of tokens if requested.
- if (addNumTokensKey) {
- builder.getDataOutput().writeShort(numTokens);
+
+ // Writing Order: token, number of token, keyfield1 ... n
+ if (!writeKeyFieldsFirst) {
+ try {
+ IToken token = tokenizer.getToken();
+ token.serializeToken(builderData);
+
builder.addFieldEndOffset();
+ // Add number of tokens if requested.
+ if (addNumTokensKey) {
+ builder.getDataOutput().writeShort(numTokens);
+ builder.addFieldEndOffset();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e.getMessage());
}
- } catch (IOException e) {
- throw new HyracksDataException(e.getMessage());
+
+ for (int k = 0; k < keyFields.length; k++) {
+ builder.addField(accessor, i, keyFields[k]);
+ }
+
+ }
+ // Writing Order: keyfield1 ... n, token, number of token
+ else {
+
+ for (int k = 0; k < keyFields.length; k++) {
+ builder.addField(accessor, i, keyFields[k]);
+ }
+
+ try {
+ IToken token = tokenizer.getToken();
+ token.serializeToken(builderData);
+
+ builder.addFieldEndOffset();
+ // Add number of tokens if requested.
+ if (addNumTokensKey) {
+ builder.getDataOutput().writeShort(numTokens);
+ builder.addFieldEndOffset();
+ }
+ } catch (IOException e) {
+ throw new HyracksDataException(e.getMessage());
+ }
+
}
- for (int k = 0; k < keyFields.length; k++) {
- builder.addField(accessor, i, keyFields[k]);
- }
-
- if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
+ if (!appender.append(builder.getFieldEndOffsets(),
+ builder.getByteArray(), 0, builder.getSize())) {
FrameUtils.flushFrame(writeBuffer, writer);
appender.reset(writeBuffer, true);
- if (!appender.append(builder.getFieldEndOffsets(), builder.getByteArray(), 0, builder.getSize())) {
- throw new HyracksDataException("Record size (" + builder.getSize() +") larger than frame size (" + appender.getBuffer().capacity() + ")");
+
+ if (!appender.append(builder.getFieldEndOffsets(),
+ builder.getByteArray(), 0, builder.getSize())) {
+ throw new HyracksDataException("Record size ("
+ + builder.getSize()
+ + ") larger than frame size ("
+ + appender.getBuffer().capacity() + ")");
}
}
+
}
+
}
+
}
@Override
diff --git a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
index 1f3ed76..10ad7fe 100644
--- a/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
+++ b/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/edu/uci/ics/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexBulkLoadOperatorDescriptor.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,10 +20,10 @@
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
@@ -40,18 +40,18 @@
private final long numElementsHint;
private final boolean checkIfEmptyIndex;
- public LSMInvertedIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, int[] fieldPermutation,
- boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
+ public LSMInvertedIndexBulkLoadOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor recDesc,
+ int[] fieldPermutation, boolean verifyInput, long numElementsHint, boolean checkIfEmptyIndex,
IStorageManagerInterface storageManager, IFileSplitProvider fileSplitProvider,
IIndexLifecycleManagerProvider lifecycleManagerProvider, ITypeTraits[] tokenTypeTraits,
IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] invListsTypeTraits,
IBinaryComparatorFactory[] invListComparatorFactories, IBinaryTokenizerFactory tokenizerFactory,
- IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory,
- IModificationOperationCallbackFactory modificationOpCallbackFactory) {
- super(spec, 1, 0, null, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
+ IIndexDataflowHelperFactory invertedIndexDataflowHelperFactory) {
+ super(spec, 1, 1, recDesc, storageManager, fileSplitProvider, lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListComparatorFactories, tokenizerFactory,
- invertedIndexDataflowHelperFactory, null, false, false,
- null, NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE, modificationOpCallbackFactory);
+ invertedIndexDataflowHelperFactory, null, false, false, null,
+ NoOpLocalResourceFactoryProvider.INSTANCE, NoOpOperationCallbackFactory.INSTANCE,
+ NoOpOperationCallbackFactory.INSTANCE);
this.fieldPermutation = fieldPermutation;
this.verifyInput = verifyInput;
this.numElementsHint = numElementsHint;
diff --git a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
index 0c96324..462df4d 100644
--- a/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
+++ b/pregelix/pregelix-core/src/main/java/edu/uci/ics/pregelix/core/jobgen/JobGen.java
@@ -70,6 +70,7 @@
import edu.uci.ics.hyracks.dataflow.std.group.IAggregatorDescriptorFactory;
import edu.uci.ics.hyracks.dataflow.std.group.external.ExternalGroupOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.Algorithm;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.hdfs2.dataflow.HDFSReadOperatorDescriptor;
@@ -575,17 +576,21 @@
typeTraits[0] = new TypeTraits(false);
typeTraits[1] = new TypeTraits(false);
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
- storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories,
- sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, BF_HINT, false,
- getIndexDataflowHelperFactory(), NoOpOperationCallbackFactory.INSTANCE);
+ recordDescriptor, storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits,
+ comparatorFactories, sortFields, fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, true, BF_HINT, false,
+ getIndexDataflowHelperFactory());
setLocationConstraint(spec, btreeBulkLoad);
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+ setLocationConstraint(spec, nsOpDesc);
+
/**
* connect operator descriptors
*/
ITuplePartitionComputerFactory hashPartitionComputerFactory = getVertexPartitionComputerFactory();
spec.connect(new MToNPartitioningConnectorDescriptor(spec, hashPartitionComputerFactory), scanner, 0, sorter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
+ spec.connect(new OneToOneConnectorDescriptor(spec), btreeBulkLoad, 0, nsOpDesc, 0);
spec.setFrameSize(frameSize);
return spec;
}
diff --git a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
index 5478ed9..c600c93 100644
--- a/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
+++ b/pregelix/pregelix-core/src/test/java/edu/uci/ics/pregelix/core/join/JoinTest.java
@@ -3,9 +3,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -48,6 +48,7 @@
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import edu.uci.ics.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
@@ -280,18 +281,20 @@
ITypeTraits[] typeTraits = new ITypeTraits[custDesc.getFields().length];
for (int i = 0; i < typeTraits.length; i++)
typeTraits[i] = new TypeTraits(false);
- TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec,
+ TreeIndexBulkLoadOperatorDescriptor writer = new TreeIndexBulkLoadOperatorDescriptor(spec, custDesc,
storageManagerInterface, lcManagerProvider, fileSplitProvider, typeTraits, comparatorFactories, null,
- fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 100000, false, new BTreeDataflowHelperFactory(),
- NoOpOperationCallbackFactory.INSTANCE);
+ fieldPermutation, DEFAULT_BTREE_FILL_FACTOR, false, 100000, false, new BTreeDataflowHelperFactory());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, writer, NC1_ID, NC2_ID);
+ NullSinkOperatorDescriptor nsOpDesc = new NullSinkOperatorDescriptor(spec);
+ PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, nsOpDesc, NC1_ID, NC2_ID);
+
spec.connect(new OneToOneConnectorDescriptor(spec), custScanner, 0, sorter, 0);
spec.connect(new MToNPartitioningMergingConnectorDescriptor(spec, new FieldHashPartitionComputerFactory(
sortFields, new IBinaryHashFunctionFactory[] { stringHashFactory }), sortFields, comparatorFactories,
nmkFactory), sorter, 0, writer, 0);
-
- spec.addRoot(writer);
+ spec.connect(new OneToOneConnectorDescriptor(spec), writer, 0, nsOpDesc, 0);
+ spec.addRoot(nsOpDesc);
runTest(spec);
}