Enable commit runtime extension
Change-Id: I98083ea5e93cb5f45d92c5dfbacfee1020fad57a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1485
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index 8588381..5a723b3 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -226,10 +226,6 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-bloomfilter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>algebricks-runtime</artifactId>
</dependency>
<dependency>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 1c01c40..d0cee55 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -23,6 +23,7 @@
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -36,6 +37,7 @@
import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.io.FileSplit;
@@ -43,20 +45,16 @@
private final List<LogicalVariable> primaryKeyLogicalVars;
private final JobId jobId;
- private final int datasetId;
- private final String dataverse;
- private final String dataset;
+ private final Dataset dataset;
private final LogicalVariable upsertVar;
private final boolean isSink;
- public CommitPOperator(JobId jobId, String dataverse, String dataset, int datasetId,
- List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
+ public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars,
+ LogicalVariable upsertVar, boolean isSink) {
this.jobId = jobId;
- this.datasetId = datasetId;
+ this.dataset = dataset;
this.primaryKeyLogicalVars = primaryKeyLogicalVars;
this.upsertVar = upsertVar;
- this.dataverse = dataverse;
- this.dataset = dataset;
this.isSink = isSink;
}
@@ -86,28 +84,26 @@
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
- throws AlgebricksException {
+ throws AlgebricksException {
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
- RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
- context);
+ RecordDescriptor recDesc =
+ JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
//get dataset splits
FileSplit[] splitsForDataset = metadataProvider.splitsForDataset(metadataProvider.getMetadataTxnContext(),
- dataverse, dataset, dataset, metadataProvider.isTemporaryDatasetWriteJob());
+ dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
+ metadataProvider.isTemporaryDatasetWriteJob());
int[] datasetPartitions = new int[splitsForDataset.length];
for (int i = 0; i < splitsForDataset.length; i++) {
datasetPartitions[i] = i;
}
-
int upsertVarIdx = -1;
- CommitRuntimeFactory runtime = null;
if (upsertVar != null) {
upsertVarIdx = inputSchemas[0].findVariable(upsertVar);
}
- runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
- metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
- datasetPartitions, isSink);
+ IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId, primaryKeyFields, metadataProvider,
+ upsertVarIdx, datasetPartitions, isSink);
builder.contributeMicroOperator(op, runtime, recDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 5bafe83..9b442ae 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -27,6 +27,7 @@
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -70,9 +71,7 @@
boolean isSink = ((CommitOperator) eOp.getDelegate()).isSink();
List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
- int datasetId = 0;
- String dataverse = null;
- String datasetName = null;
+ Dataset dataset = null;
AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
LogicalVariable upsertVar = null;
while (descendantOp != null) {
@@ -80,29 +79,19 @@
IndexInsertDeleteUpsertOperator operator = (IndexInsertDeleteUpsertOperator) descendantOp;
if (!operator.isBulkload() && operator.getPrevSecondaryKeyExprs() == null) {
primaryKeyExprs = operator.getPrimaryKeyExpressions();
- datasetId = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
- .getDatasetId();
- dataverse = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
- .getDataverseName();
- datasetName = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
- .getDatasetName();
+ dataset = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset();
break;
}
} else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
InsertDeleteUpsertOperator insertDeleteUpsertOperator = (InsertDeleteUpsertOperator) descendantOp;
if (!insertDeleteUpsertOperator.isBulkload()) {
primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
- datasetId = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
- .getDatasetId();
- dataverse = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
- .getDataverseName();
- datasetName = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
- .getDatasetName();
+ dataset = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset();
if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
//we need to add a function that checks if previous record was found
upsertVar = context.newVar();
- AbstractFunctionCallExpression orFunc = new ScalarFunctionCallExpression(
- FunctionUtil.getFunctionInfo(BuiltinFunctions.OR));
+ AbstractFunctionCallExpression orFunc =
+ new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.OR));
// is new value missing? -> this means that the expected operation is delete
AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(BuiltinFunctions.IS_MISSING));
@@ -116,11 +105,10 @@
orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
// AssignOperator puts in the cast var the casted record
- AssignOperator upsertFlagAssign = new AssignOperator(upsertVar,
- new MutableObject<ILogicalExpression>(orFunc));
+ AssignOperator upsertFlagAssign =
+ new AssignOperator(upsertVar, new MutableObject<ILogicalExpression>(orFunc));
// Connect the current top of the plan to the cast operator
- upsertFlagAssign.getInputs()
- .add(new MutableObject<ILogicalOperator>(eOp.getInputs().get(0).getValue()));
+ upsertFlagAssign.getInputs().add(new MutableObject<>(eOp.getInputs().get(0).getValue()));
eOp.getInputs().clear();
eOp.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
@@ -151,8 +139,8 @@
//create the logical and physical operator
CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
- CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId,
- primaryKeyLogicalVars, upsertVar, isSink);
+ CommitPOperator commitPOperator =
+ new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, upsertVar, isSink);
commitOperator.setPhysicalOperator(commitPOperator);
//create ExtensionOperator and put the commitOperator in it.
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index bc18045..cc12f36 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -25,7 +25,6 @@
import java.util.Map;
import java.util.logging.Logger;
-import org.apache.asterix.algebra.operators.physical.CommitRuntime;
import org.apache.asterix.app.external.TestLibrarian;
import org.apache.asterix.app.nc.NCAppRuntimeContext;
import org.apache.asterix.app.nc.TransactionSubsystem;
@@ -57,6 +56,7 @@
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.asterix.transaction.management.runtime.CommitRuntime;
import org.apache.asterix.transaction.management.service.logging.LogReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 1f1c139..f8755b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -18,6 +18,12 @@
*/
package org.apache.asterix.external.library.java;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -87,12 +93,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.util.string.UTF8StringReader;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.List;
-
public class JObjectAccessors {
public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) {
@@ -455,7 +455,7 @@
this.typeInfo = new TypeInfo(objectPool, null, null);
this.jObjects = new IJObject[recordType.getFieldNames().length];
this.jRecord = new JRecord(recordType, jObjects);
- this.openFields = new LinkedHashMap<String, IJObject>();
+ this.openFields = new LinkedHashMap<>();
}
@Override
@@ -473,12 +473,11 @@
for (IVisitablePointable fieldPointable : fieldPointables) {
closedPart = index < recordType.getFieldTypes().length;
IVisitablePointable tt = fieldTypeTags.get(index);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
- .deserialize(tt.getByteArray()[tt.getStartOffset()]);
+ ATypeTag typeTag =
+ EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tt.getByteArray()[tt.getStartOffset()]);
IAType fieldType;
- fieldType = closedPart ?
- recordType.getFieldTypes()[index] :
- TypeTagUtil.getBuiltinTypeByTag(typeTag);
+ fieldType =
+ closedPart ? recordType.getFieldTypes()[index] : TypeTagUtil.getBuiltinTypeByTag(typeTag);
IVisitablePointable fieldName = fieldNames.get(index);
typeInfo.reset(fieldType, typeTag);
switch (typeTag) {
@@ -491,8 +490,8 @@
// value is null
fieldObject = null;
} else {
- fieldObject = pointableVisitor
- .visit((AListVisitablePointable) fieldPointable, typeInfo);
+ fieldObject =
+ pointableVisitor.visit((AListVisitablePointable) fieldPointable, typeInfo);
}
break;
case ANY:
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 4ebf055..55cd304 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -66,11 +67,13 @@
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
+import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -241,13 +244,8 @@
return false;
}
Dataset otherDataset = (Dataset) other;
- if (!otherDataset.dataverseName.equals(dataverseName)) {
- return false;
- }
- if (!otherDataset.datasetName.equals(datasetName)) {
- return false;
- }
- return true;
+ return Objects.equals(dataverseName, otherDataset.dataverseName)
+ && Objects.equals(datasetName, otherDataset.datasetName);
}
public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR: this method is meant to be extended
@@ -567,10 +565,13 @@
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((datasetName == null) ? 0 : datasetName.hashCode());
- result = prime * result + ((dataverseName == null) ? 0 : dataverseName.hashCode());
- return result;
+ return Objects.hash(dataverseName, datasetName);
+ }
+
+ public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] primaryKeyFields,
+ MetadataProvider metadataProvider, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
+ return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
+ metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
+ datasetPartitions, isSink);
}
}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
index 511ea9f..0a2f166 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@
private static final long serialVersionUID = 1L;
public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
- public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
- .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+ public static final IPrinter PRINTER =
+ (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
@Override
public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
index 666fa0a..2a878ac 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
@@ -29,8 +29,8 @@
private static final long serialVersionUID = 1L;
public static final ShortWithoutTypeInfoPrinterFactory INSTANCE = new ShortWithoutTypeInfoPrinterFactory();
- public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
- .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+ public static final IPrinter PRINTER =
+ (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
@Override
public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
index 4aa6ccd..c500e86 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@
private static final long serialVersionUID = 1L;
public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
- public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
- .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+ public static final IPrinter PRINTER =
+ (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
@Override
public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
index ebc09a0..aa6fcbe 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@
private static final long serialVersionUID = 1L;
public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
- public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
- .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+ public static final IPrinter PRINTER =
+ (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
@Override
public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
index b65897b..959c4ad 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@
private static final long serialVersionUID = 1L;
public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
- public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) ->
- ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+ public static final IPrinter PRINTER =
+ (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
@Override
public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
index 227c2cd..7d6a078 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
@@ -29,7 +29,6 @@
public class ABooleanSerializerDeserializer implements ISerializerDeserializer<ABoolean> {
private static final long serialVersionUID = 1L;
-
public static final ABooleanSerializerDeserializer INSTANCE = new ABooleanSerializerDeserializer();
private ABooleanSerializerDeserializer() {
@@ -54,11 +53,6 @@
}
public static boolean getBoolean(byte[] bytes, int offset) {
- if (bytes[offset] == 0) {
- return false;
- } else {
- return true;
- }
+ return bytes[offset] != 0;
}
-
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
index d9cfc67..ef727c9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.AOrderedListType;
import org.apache.asterix.om.types.BuiltinType;
@@ -31,7 +32,6 @@
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.BooleanPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -75,7 +75,7 @@
int gramLength = ATypeHierarchy.getIntegerValue(BuiltinFunctions.GRAM_TOKENS.getName(), 1,
gramLengthArg.getByteArray(), gramLengthArg.getStartOffset());
tokenizer.setGramlength(gramLength);
- boolean prePost = BooleanPointable.getBoolean(prePostArg.getByteArray(),
+ boolean prePost = ABooleanSerializerDeserializer.getBoolean(prePostArg.getByteArray(),
prePostArg.getStartOffset() + typeIndicatorSize);
tokenizer.setPrePost(prePost);
tokenizer.reset(stringArg.getByteArray(), stringArg.getStartOffset(), stringArg.getLength());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
index 9fd5dc4..e9f9c9e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
@@ -74,14 +74,14 @@
return new IScalarEvaluator() {
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+ private ISerializerDeserializer<ABoolean> booleanSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
+ private ISerializerDeserializer<ANull> nullSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AMISSING);
+ private ISerializerDeserializer<AMissing> missingSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -111,7 +111,7 @@
ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
}
boolean argResult = ABooleanSerializerDeserializer.getBoolean(bytes, offset + 1);
- if (! argResult) {
+ if (!argResult) {
// anything AND FALSE = FALSE
booleanSerde.serialize(ABoolean.FALSE, out);
result.set(resultStorage);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
index f6c8c4f..0509f51 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
@@ -22,6 +22,7 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -35,7 +36,6 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.BooleanPointable;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -56,8 +56,8 @@
protected final IScalarEvaluator usePrePostEval;
@SuppressWarnings("unchecked")
- private final ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+ private final ISerializerDeserializer<ABoolean> booleanSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
private final UTF8StringPointable utf8Ptr = new UTF8StringPointable();
@@ -88,14 +88,12 @@
int strLen = utf8Ptr.getStringLength();
// Check type and extract edit-distance threshold.
- long edThresh = ATypeHierarchy.getIntegerValue(
- BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 1, edThreshPtr.getByteArray(),
- edThreshPtr.getStartOffset());
+ long edThresh = ATypeHierarchy.getIntegerValue(BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 1,
+ edThreshPtr.getByteArray(), edThreshPtr.getStartOffset());
// Check type and extract gram length.
- long gramLen = ATypeHierarchy.getIntegerValue(
- BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 2, gramLenPtr.getByteArray(),
- gramLenPtr.getStartOffset());
+ long gramLen = ATypeHierarchy.getIntegerValue(BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 2,
+ gramLenPtr.getByteArray(), gramLenPtr.getStartOffset());
// Check type and extract usePrePost flag.
typeTag = usePrePostPtr.getByteArray()[usePrePostPtr.getStartOffset()];
@@ -103,7 +101,7 @@
throw new TypeMismatchException(BuiltinFunctions.EDIT_DISTANCE_STRING_IS_FILTERABLE, 3, typeTag,
ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
}
- boolean usePrePost = BooleanPointable.getBoolean(usePrePostPtr.getByteArray(),
+ boolean usePrePost = ABooleanSerializerDeserializer.getBoolean(usePrePostPtr.getByteArray(),
usePrePostPtr.getStartOffset() + 1);
// Compute result.
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
index 32263ea..13037a9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
@@ -71,8 +71,8 @@
private IScalarEvaluator eval = args[0].createScalarEvaluator(ctx);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+ private ISerializerDeserializer<ABoolean> booleanSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -86,8 +86,8 @@
ABoolean aResult = argRes ? ABoolean.FALSE : ABoolean.TRUE;
booleanSerde.serialize(aResult, out);
} else {
- throw new TypeMismatchException(getIdentifier(), 0,
- bytes[offset], ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+ throw new TypeMismatchException(getIdentifier(), 0, bytes[offset],
+ ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
}
result.set(resultStorage);
}
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
index c7a608a..7aea25c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
@@ -74,14 +74,14 @@
private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
private DataOutput output = resultStorage.getDataOutput();
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+ private ISerializerDeserializer<ABoolean> booleanSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.ANULL);
+ private ISerializerDeserializer<ANull> nullSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
- .getSerializerDeserializer(BuiltinType.AMISSING);
+ private ISerializerDeserializer<AMissing> missingSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
@Override
public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -107,7 +107,7 @@
ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
}
boolean argResult = ABooleanSerializerDeserializer.getBoolean(data, offset + 1);
- if (argResult == true) {
+ if (argResult) {
// anything OR TRUE = TRUE
booleanSerde.serialize(ABoolean.TRUE, output);
result.set(resultStorage);
diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml
index 794595f..a65a436 100644
--- a/asterixdb/asterix-transactions/pom.xml
+++ b/asterixdb/asterix-transactions/pom.xml
@@ -24,7 +24,6 @@
<version>0.9.1-SNAPSHOT</version>
</parent>
<artifactId>asterix-transactions</artifactId>
-
<licenses>
<license>
<name>Apache License, Version 2.0</name>
@@ -33,11 +32,9 @@
<comments>A business-friendly OSS license</comments>
</license>
</licenses>
-
<properties>
<appendedResourcesDirectory>${basedir}/../src/main/appended-resources</appendedResourcesDirectory>
</properties>
-
<build>
<plugins>
<plugin>
@@ -83,9 +80,7 @@
</executions>
</plugin>
</plugins>
-
</build>
-
<dependencies>
<dependency>
<groupId>org.apache.hyracks</groupId>
@@ -111,6 +106,11 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.asterix</groupId>
+ <artifactId>asterix-om</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
@@ -149,6 +149,13 @@
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-common</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>algebricks-runtime</artifactId>
+ </dependency>
</dependencies>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
similarity index 95%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
rename to asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 63a91ac..d38c5b7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.asterix.algebra.operators.physical;
+package org.apache.asterix.transaction.management.runtime;
import java.nio.ByteBuffer;
@@ -86,7 +86,7 @@
transactionContext = transactionManager.getTransactionContext(jobId, false);
transactionContext.setWriteTxn(isWriteTransaction);
ILogMarkerCallback callback =
- TaskUtil.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+ TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
logRecord = new LogRecord(callback);
if (isSink) {
return;
@@ -126,7 +126,7 @@
}
}
}
- VSizeFrame message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+ VSizeFrame message = TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx);
if (message != null
&& MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
try {
@@ -183,5 +183,6 @@
@Override
public void flush() throws HyracksDataException {
+ // Commit is at the end of a modification pipeline and there is no need to flush
}
}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
similarity index 97%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
rename to asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 767d864..536e657 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.asterix.algebra.operators.physical;
+package org.apache.asterix.transaction.management.runtime;
import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
similarity index 97%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
rename to asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
index 53e0f62..9b2fe36 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.algebra.operators.physical;
+package org.apache.asterix.transaction.management.runtime;
import java.nio.ByteBuffer;
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
index db0b483..3df27ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
@@ -55,7 +55,7 @@
};
public static boolean getBoolean(byte[] bytes, int start) {
- return bytes[start] == 0 ? false : true;
+ return bytes[start] != 0;
}
public static void setBoolean(byte[] bytes, int start, boolean value) {