Modified asterix to work with the Algebricks change to use IExpressionRuntimeProvider natively
git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_stabilization@405 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
index bdb4c97..2310ba5 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ConstantFoldingRule.java
@@ -50,6 +50,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
@@ -59,12 +60,13 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
public class ConstantFoldingRule implements IAlgebraicRewriteRule {
@@ -104,8 +106,9 @@
AqlBinaryComparatorFactoryProvider.INSTANCE, AqlTypeTraitProvider.INSTANCE,
AqlBinaryBooleanInspectorImpl.INSTANCE, AqlBinaryIntegerInspector.INSTANCE,
AqlPrinterFactoryProvider.INSTANCE, AqlNullWriterFactory.INSTANCE, null,
- AqlLogicalExpressionJobGen.INSTANCE, AqlExpressionTypeComputer.INSTANCE, AqlNullableTypeComputer.INSTANCE,
- null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null);
+ new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(AqlLogicalExpressionJobGen.INSTANCE),
+ AqlExpressionTypeComputer.INSTANCE, AqlNullableTypeComputer.INSTANCE, null, null, null,
+ GlobalConfig.DEFAULT_FRAME_SIZE, null);
private static final IOperatorSchema[] _emptySchemas = new IOperatorSchema[] {};
@@ -127,7 +130,7 @@
private class ConstantFoldingVisitor implements ILogicalExpressionVisitor<Pair<Boolean, ILogicalExpression>, Void>,
ILogicalExpressionReferenceTransform {
- private ArrayBackedValueStorage resStore = new ArrayBackedValueStorage();
+ private IPointable p = VoidPointable.FACTORY.createPointable();
private ByteBufferInputStream bbis = new ByteBufferInputStream();
private DataInputStream dis = new DataInputStream(bbis);
@@ -175,16 +178,15 @@
return new Pair<Boolean, ILogicalExpression>(changed, expr);
}
}
- ICopyEvaluatorFactory fact = _jobGenCtx.getExpressionJobGen().createEvaluatorFactory(expr, _emptyTypeEnv,
- _emptySchemas, _jobGenCtx);
- ICopyEvaluator eval = fact.createEvaluator(resStore);
- resStore.reset();
- eval.evaluate(null);
+ IScalarEvaluatorFactory fact = _jobGenCtx.getExpressionRuntimeProvider().createEvaluatorFactory(expr,
+ _emptyTypeEnv, _emptySchemas, _jobGenCtx);
+ IScalarEvaluator eval = fact.createScalarEvaluator();
+ eval.evaluate(null, p);
Object t = _emptyTypeEnv.getType(expr);
-
+
@SuppressWarnings("rawtypes")
ISerializerDeserializer serde = _jobGenCtx.getSerializerDeserializerProvider().getSerializerDeserializer(t);
- bbis.setByteBuffer(ByteBuffer.wrap(resStore.getByteArray(), resStore.getStartOffset(), resStore.getLength()), 0);
+ bbis.setByteBuffer(ByteBuffer.wrap(p.getByteArray(), p.getStartOffset(), p.getLength()), 0);
IAObject o;
try {
o = (IAObject) serde.deserialize(dis);
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
index 7f6769e..25aea8f 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/api/common/APIFramework.java
@@ -59,6 +59,7 @@
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.AbstractRuleController;
@@ -512,7 +513,8 @@
builder.setBinaryIntegerInspector(format.getBinaryIntegerInspector());
builder.setClusterLocations(clusterLocs);
builder.setComparatorFactoryProvider(format.getBinaryComparatorFactoryProvider());
- builder.setExprJobGen(AqlLogicalExpressionJobGen.INSTANCE);
+ builder.setExpressionRuntimeProvider(new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(
+ AqlLogicalExpressionJobGen.INSTANCE));
builder.setHashFunctionFactoryProvider(format.getBinaryHashFunctionFactoryProvider());
builder.setNullWriterFactory(format.getNullWriterFactory());
builder.setPrinterProvider(format.getPrinterFactoryProvider());
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
index e996c1a..5aa0f6e 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/DatasetOperations.java
@@ -42,11 +42,13 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
@@ -139,8 +141,8 @@
}
// TODO: Lots of common code in this file. Refactor everything after merging in asterix-fix-issue-9.
- public static JobSpecification createDatasetJobSpec(String datasetName,
- AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+ public static JobSpecification createDatasetJobSpec(String datasetName, AqlCompiledMetadataDeclarations metadata)
+ throws AsterixException, AlgebricksException {
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AsterixException("Could not find dataset " + datasetName);
@@ -159,15 +161,15 @@
LOGGER.info("CREATING File Splits: " + sb.toString());
IIndexRegistryProvider<IIndex> indexRegistryProvider = AsterixIndexRegistryProvider.INSTANCE;
IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
- TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
- storageManager, indexRegistryProvider, splitsAndConstraint.first, typeTraits, comparatorFactories,
+ TreeIndexCreateOperatorDescriptor indexCreateOp = new TreeIndexCreateOperatorDescriptor(spec, storageManager,
+ indexRegistryProvider, splitsAndConstraint.first, typeTraits, comparatorFactories,
new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
splitsAndConstraint.second);
spec.addRoot(indexCreateOp);
return spec;
}
-
+
public static Job createLoadDatasetJobSpec(CompiledLoadFromFileStatement loadStmt,
AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
String datasetName = loadStmt.getDatasetName();
@@ -286,7 +288,12 @@
outColumns[i] = i + 1;
projectionList[i + 1] = i + 1;
}
- return new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[evalFactories.length];
+ for (int i = 0; i < evalFactories.length; ++i) {
+ sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+ evalFactories[i]);
+ }
+ return new AssignRuntimeFactory(outColumns, sefs, projectionList);
}
private static RecordDescriptor computePayloadKeyRecordDescriptor(AqlCompiledDatasetDecl compiledDatasetDecl,
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
index 3fd75d0..4d7cbe2 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/FeedOperations.java
@@ -37,8 +37,10 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
@@ -144,7 +146,12 @@
outColumns[i] = i + 1;
projectionList[i + 1] = i + 1;
}
- return new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[evalFactories.length];
+ for (int i = 0; i < evalFactories.length; ++i) {
+ sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+ evalFactories[i]);
+ }
+ return new AssignRuntimeFactory(outColumns, sefs, projectionList);
}
@SuppressWarnings("unchecked")
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
index bffd1c4..5a4d6c9 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryIndexCreator.java
@@ -27,11 +27,13 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.evaluators.ColumnAccessEvalFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
@@ -59,13 +61,13 @@
// not just for creation.
public abstract class SecondaryIndexCreator {
protected final PhysicalOptimizationConfig physOptConf;
-
+
protected int numPrimaryKeys;
protected int numSecondaryKeys;
protected AqlCompiledMetadataDeclarations metadata;
protected String datasetName;
protected AqlCompiledDatasetDecl datasetDecl;
- protected ARecordType itemType;
+ protected ARecordType itemType;
protected ISerializerDeserializer payloadSerde;
protected IFileSplitProvider primaryFileSplitProvider;
protected AlgebricksPartitionConstraint primaryPartitionConstraint;
@@ -79,13 +81,15 @@
protected IBinaryComparatorFactory[] secondaryComparatorFactories;
protected RecordDescriptor secondaryRecDesc;
protected ICopyEvaluatorFactory[] evalFactories;
-
+
// Prevent public construction. Should be created via createIndexCreator().
protected SecondaryIndexCreator(PhysicalOptimizationConfig physOptConf) {
this.physOptConf = physOptConf;
}
- public static SecondaryIndexCreator createIndexCreator(CompiledCreateIndexStatement createIndexStmt, AqlCompiledMetadataDeclarations metadata, PhysicalOptimizationConfig physOptConf) throws AsterixException, AlgebricksException {
+ public static SecondaryIndexCreator createIndexCreator(CompiledCreateIndexStatement createIndexStmt,
+ AqlCompiledMetadataDeclarations metadata, PhysicalOptimizationConfig physOptConf) throws AsterixException,
+ AlgebricksException {
SecondaryIndexCreator indexCreator = null;
switch (createIndexStmt.getIndexType()) {
case BTREE: {
@@ -107,12 +111,13 @@
indexCreator.init(createIndexStmt, metadata);
return indexCreator;
}
-
+
public abstract JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException;
-
+
public abstract JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException;
-
- protected void init(CompiledCreateIndexStatement createIndexStmt, AqlCompiledMetadataDeclarations metadata) throws AsterixException, AlgebricksException {
+
+ protected void init(CompiledCreateIndexStatement createIndexStmt, AqlCompiledMetadataDeclarations metadata)
+ throws AsterixException, AlgebricksException {
this.metadata = metadata;
datasetName = createIndexStmt.getDatasetName();
secondaryIndexName = createIndexStmt.getIndexName();
@@ -124,8 +129,7 @@
throw new AsterixException("Cannot index an external dataset (" + datasetName + ").");
}
itemType = (ARecordType) metadata.findType(datasetDecl.getItemTypeName());
- payloadSerde = AqlSerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(itemType);
+ payloadSerde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
numSecondaryKeys = createIndexStmt.getKeyFields().size();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadata
@@ -140,7 +144,7 @@
setPrimaryRecDescAndComparators();
setSecondaryRecDescAndComparators(createIndexStmt.getKeyFields());
}
-
+
protected void setPrimaryRecDescAndComparators() throws AlgebricksException {
int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(datasetDecl).size();
ISerializerDeserializer[] primaryRecFields = new ISerializerDeserializer[numPrimaryKeys + 1];
@@ -154,8 +158,8 @@
IAType keyType = evalFactoryAndType.third;
ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
primaryRecFields[i] = keySerde;
- primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(keyType, true);
+ primaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, true);
primaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
@@ -163,8 +167,9 @@
primaryTypeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
primaryRecDesc = new RecordDescriptor(primaryRecFields, primaryTypeTraits);
}
-
- protected void setSecondaryRecDescAndComparators(List<String> secondaryKeyFields) throws AlgebricksException, AsterixException {
+
+ protected void setSecondaryRecDescAndComparators(List<String> secondaryKeyFields) throws AlgebricksException,
+ AsterixException {
evalFactories = new ICopyEvaluatorFactory[numSecondaryKeys];
secondaryComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys + numPrimaryKeys];
ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
@@ -173,13 +178,14 @@
for (int i = 0; i < numSecondaryKeys; i++) {
evalFactories[i] = metadata.getFormat().getFieldAccessEvaluatorFactory(itemType, secondaryKeyFields.get(i),
numPrimaryKeys);
- Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
+ Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
+ secondaryKeyFields.get(i), itemType);
IAType keyType = keyTypePair.first;
anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(keyType);
secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(keyType, true);
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ keyType, true);
secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
// Add serializers and comparators for primary index fields.
@@ -190,9 +196,9 @@
}
secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
}
-
- protected AbstractOperatorDescriptor createDummyKeyProviderOp(
- JobSpecification spec) throws AsterixException, AlgebricksException {
+
+ protected AbstractOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec) throws AsterixException,
+ AlgebricksException {
// Build dummy tuple containing one field with a dummy value inside.
ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
DataOutput dos = tb.getDataOutput();
@@ -207,14 +213,13 @@
tb.addFieldEndOffset();
ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(
- spec, keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
- tb.getSize());
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(
- spec, keyProviderOp, primaryPartitionConstraint);
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
+ primaryPartitionConstraint);
return keyProviderOp;
}
-
+
protected BTreeSearchOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
// -Infinity
int[] lowKeyFields = null;
@@ -228,7 +233,7 @@
primaryPartitionConstraint);
return primarySearchOp;
}
-
+
protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification spec,
BTreeSearchOperatorDescriptor primaryScanOp, int numSecondaryKeyFields) throws AlgebricksException {
int[] outColumns = new int[numSecondaryKeyFields];
@@ -243,15 +248,19 @@
for (int i = 0; i < numPrimaryKeys; i++) {
projectionList[projCount++] = i;
}
-
- AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
+ IScalarEvaluatorFactory[] sefs = new IScalarEvaluatorFactory[evalFactories.length];
+ for (int i = 0; i < evalFactories.length; ++i) {
+ sefs[i] = new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(
+ evalFactories[i]);
+ }
+ AssignRuntimeFactory assign = new AssignRuntimeFactory(outColumns, sefs, projectionList);
AlgebricksMetaOperatorDescriptor asterixAssignOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { secondaryRecDesc });
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixAssignOp,
primaryPartitionConstraint);
return asterixAssignOp;
}
-
+
protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
int[] sortFields = new int[secondaryComparatorFactories.length];
@@ -260,14 +269,13 @@
}
ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
physOptConf.getMaxFramesExternalSort(), sortFields, secondaryComparatorFactories, secondaryRecDesc);
- AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp,
- primaryPartitionConstraint);
+ AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
return sortOp;
}
-
+
protected TreeIndexBulkLoadOperatorDescriptor createTreeIndexBulkLoadOp(JobSpecification spec,
- int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor) throws MetadataException,
- AlgebricksException {
+ int numSecondaryKeyFields, IIndexDataflowHelperFactory dataflowHelperFactory, float fillFactor)
+ throws MetadataException, AlgebricksException {
int[] fieldPermutation = new int[numSecondaryKeyFields + numPrimaryKeys];
for (int i = 0; i < numSecondaryKeyFields + numPrimaryKeys; i++) {
fieldPermutation[i] = i;
@@ -282,16 +290,19 @@
secondarySplitsAndConstraint.second);
return treeIndexBulkLoadOp;
}
-
- public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields) throws AlgebricksException {
+
+ public AlgebricksMetaOperatorDescriptor createFilterNullsSelectOp(JobSpecification spec, int numSecondaryKeyFields)
+ throws AlgebricksException {
ICopyEvaluatorFactory[] andArgsEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeyFields];
NotDescriptor notDesc = new NotDescriptor();
IsNullDescriptor isNullDesc = new IsNullDescriptor();
for (int i = 0; i < numSecondaryKeyFields; i++) {
// Access column i, and apply 'is not null'.
ColumnAccessEvalFactory columnAccessEvalFactory = new ColumnAccessEvalFactory(i);
- ICopyEvaluatorFactory isNullEvalFactory = isNullDesc.createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
- ICopyEvaluatorFactory notEvalFactory = notDesc.createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
+ ICopyEvaluatorFactory isNullEvalFactory = isNullDesc
+ .createEvaluatorFactory(new ICopyEvaluatorFactory[] { columnAccessEvalFactory });
+ ICopyEvaluatorFactory notEvalFactory = notDesc
+ .createEvaluatorFactory(new ICopyEvaluatorFactory[] { isNullEvalFactory });
andArgsEvalFactories[i] = notEvalFactory;
}
ICopyEvaluatorFactory selectCond = null;
@@ -302,7 +313,9 @@
} else {
selectCond = andArgsEvalFactories[0];
}
- StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(selectCond, null, AqlBinaryBooleanInspectorImpl.INSTANCE);
+ StreamSelectRuntimeFactory select = new StreamSelectRuntimeFactory(
+ new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.ScalarEvaluatorFactoryAdapter(selectCond),
+ null, AqlBinaryBooleanInspectorImpl.INSTANCE);
AlgebricksMetaOperatorDescriptor asterixSelectOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
new IPushRuntimeFactory[] { select }, new RecordDescriptor[] { secondaryRecDesc });
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixSelectOp,
diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
index b2db19a..6980b14 100644
--- a/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
+++ b/asterix-app/src/main/java/edu/uci/ics/asterix/file/SecondaryRTreeCreator.java
@@ -38,7 +38,7 @@
protected IPrimitiveValueProviderFactory[] valueProviderFactories;
protected int numNestedSecondaryKeyFields;
-
+
protected SecondaryRTreeCreator(PhysicalOptimizationConfig physOptConf) {
super(physOptConf);
}
@@ -56,7 +56,7 @@
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}
-
+
@Override
protected void setSecondaryRecDescAndComparators(List<String> secondaryKeyFields) throws AlgebricksException,
AsterixException {
@@ -67,7 +67,8 @@
+ numSecondaryKeys
+ " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
}
- Pair<IAType, Boolean> spatialTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
+ Pair<IAType, Boolean> spatialTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
+ secondaryKeyFields.get(0), itemType);
IAType spatialType = spatialTypePair.first;
anySecondaryKeyIsNullable = spatialTypePair.second;
if (spatialType == null) {
@@ -87,8 +88,8 @@
ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(nestedKeyType);
secondaryRecFields[i] = keySerde;
- secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE
- .getBinaryComparatorFactory(nestedKeyType, true);
+ secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
+ nestedKeyType, true);
secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
}
@@ -99,26 +100,27 @@
}
secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
}
-
+
@Override
public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
JobSpecification spec = new JobSpecification();
-
+
// Create dummy key provider for feeding the primary index scan.
AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
-
+
// Create primary index scan op.
BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
// Assign op.
- AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numNestedSecondaryKeyFields);
+ AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp,
+ numNestedSecondaryKeyFields);
// If any of the secondary fields are nullable, then add a select op that filters nulls.
AlgebricksMetaOperatorDescriptor selectOp = null;
if (anySecondaryKeyIsNullable) {
selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeyFields);
}
-
+
// Create secondary RTree bulk load op.
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec,
numNestedSecondaryKeyFields, new RTreeDataflowHelperFactory(valueProviderFactories),
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index 397de61..3affe24 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -50,7 +50,7 @@
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ILogicalExpressionJobGen;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -64,6 +64,7 @@
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -756,9 +757,9 @@
if (filterExpr == null) {
return null;
}
- ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
- ICopyEvaluatorFactory filterEvalFactory = exprJobGen.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas,
- context);
+ IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+ IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
+ typeEnv, inputSchemas, context);
return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspector());
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
index 225377e..da006bd 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilter.java
@@ -17,28 +17,28 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import edu.uci.ics.hyracks.dataflow.common.data.accessors.ArrayBackedValueStorage;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.VoidPointable;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
public class AsterixTupleFilter implements ITupleFilter {
-
- private final IBinaryBooleanInspector boolInspector;
- private final ICopyEvaluator eval;
- private final ArrayBackedValueStorage evalOut = new ArrayBackedValueStorage();
-
- public AsterixTupleFilter(ICopyEvaluatorFactory evalFactory,
- IBinaryBooleanInspector boolInspector) throws AlgebricksException {
- this.boolInspector = boolInspector;
- eval = evalFactory.createEvaluator(evalOut);
- }
-
- @Override
- public boolean accept(IFrameTupleReference tuple) throws Exception {
- evalOut.reset();
- eval.evaluate(tuple);
- return boolInspector.getBooleanValue(evalOut.getByteArray(), 0, 2);
- }
+
+ private final IBinaryBooleanInspector boolInspector;
+ private final IScalarEvaluator eval;
+ private final IPointable p = VoidPointable.FACTORY.createPointable();
+
+ public AsterixTupleFilter(IScalarEvaluatorFactory evalFactory, IBinaryBooleanInspector boolInspector)
+ throws AlgebricksException {
+ this.boolInspector = boolInspector;
+ eval = evalFactory.createScalarEvaluator();
+ }
+
+ @Override
+ public boolean accept(IFrameTupleReference tuple) throws Exception {
+ eval.evaluate(tuple, p);
+ return boolInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+ }
}
diff --git a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java
index 2f4e71b..1495b72 100644
--- a/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java
+++ b/asterix-runtime/src/main/java/edu/uci/ics/asterix/runtime/base/AsterixTupleFilterFactory.java
@@ -17,23 +17,23 @@
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspector;
-import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilter;
import edu.uci.ics.hyracks.storage.am.common.api.ITupleFilterFactory;
public class AsterixTupleFilterFactory implements ITupleFilterFactory {
private static final long serialVersionUID = 1L;
-
+
private final IBinaryBooleanInspector boolInspector;
- private final ICopyEvaluatorFactory evalFactory;
-
- public AsterixTupleFilterFactory(ICopyEvaluatorFactory evalFactory,
- IBinaryBooleanInspector boolInspector) throws AlgebricksException {
+ private final IScalarEvaluatorFactory evalFactory;
+
+ public AsterixTupleFilterFactory(IScalarEvaluatorFactory evalFactory, IBinaryBooleanInspector boolInspector)
+ throws AlgebricksException {
this.evalFactory = evalFactory;
this.boolInspector = boolInspector;
}
-
+
@Override
public ITupleFilter createTupleFilter() throws Exception {
return new AsterixTupleFilter(evalFactory, boolInspector);