Enable commit runtime extension

Change-Id: I98083ea5e93cb5f45d92c5dfbacfee1020fad57a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1485
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamousaa@gmail.com>
diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml
index 8588381..5a723b3 100644
--- a/asterixdb/asterix-algebra/pom.xml
+++ b/asterixdb/asterix-algebra/pom.xml
@@ -226,10 +226,6 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
-      <artifactId>hyracks-storage-am-bloomfilter</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hyracks</groupId>
       <artifactId>algebricks-runtime</artifactId>
     </dependency>
     <dependency>
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 1c01c40..d0cee55 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -23,6 +23,7 @@
 
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
@@ -36,6 +37,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.io.FileSplit;
 
@@ -43,20 +45,16 @@
 
     private final List<LogicalVariable> primaryKeyLogicalVars;
     private final JobId jobId;
-    private final int datasetId;
-    private final String dataverse;
-    private final String dataset;
+    private final Dataset dataset;
     private final LogicalVariable upsertVar;
     private final boolean isSink;
 
-    public CommitPOperator(JobId jobId, String dataverse, String dataset, int datasetId,
-            List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) {
+    public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars,
+            LogicalVariable upsertVar, boolean isSink) {
         this.jobId = jobId;
-        this.datasetId = datasetId;
+        this.dataset = dataset;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.upsertVar = upsertVar;
-        this.dataverse = dataverse;
-        this.dataset = dataset;
         this.isSink = isSink;
     }
 
@@ -86,28 +84,26 @@
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
-        RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
-                context);
+        RecordDescriptor recDesc =
+                JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
         int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
 
         //get dataset splits
         FileSplit[] splitsForDataset = metadataProvider.splitsForDataset(metadataProvider.getMetadataTxnContext(),
-                dataverse, dataset, dataset, metadataProvider.isTemporaryDatasetWriteJob());
+                dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(),
+                metadataProvider.isTemporaryDatasetWriteJob());
         int[] datasetPartitions = new int[splitsForDataset.length];
         for (int i = 0; i < splitsForDataset.length; i++) {
             datasetPartitions[i] = i;
         }
-
         int upsertVarIdx = -1;
-        CommitRuntimeFactory runtime = null;
         if (upsertVar != null) {
             upsertVarIdx = inputSchemas[0].findVariable(upsertVar);
         }
-        runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
-                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
-                datasetPartitions, isSink);
+        IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId, primaryKeyFields, metadataProvider,
+                upsertVarIdx, datasetPartitions, isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 5bafe83..9b442ae 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -70,9 +71,7 @@
         boolean isSink = ((CommitOperator) eOp.getDelegate()).isSink();
 
         List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
-        int datasetId = 0;
-        String dataverse = null;
-        String datasetName = null;
+        Dataset dataset = null;
         AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue();
         LogicalVariable upsertVar = null;
         while (descendantOp != null) {
@@ -80,29 +79,19 @@
                 IndexInsertDeleteUpsertOperator operator = (IndexInsertDeleteUpsertOperator) descendantOp;
                 if (!operator.isBulkload() && operator.getPrevSecondaryKeyExprs() == null) {
                     primaryKeyExprs = operator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
-                            .getDatasetId();
-                    dataverse = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
-                            .getDataverseName();
-                    datasetName = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset()
-                            .getDatasetName();
+                    dataset = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset();
                     break;
                 }
             } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) {
                 InsertDeleteUpsertOperator insertDeleteUpsertOperator = (InsertDeleteUpsertOperator) descendantOp;
                 if (!insertDeleteUpsertOperator.isBulkload()) {
                     primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions();
-                    datasetId = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetId();
-                    dataverse = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDataverseName();
-                    datasetName = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset()
-                            .getDatasetName();
+                    dataset = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset();
                     if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) {
                         //we need to add a function that checks if previous record was found
                         upsertVar = context.newVar();
-                        AbstractFunctionCallExpression orFunc = new ScalarFunctionCallExpression(
-                                FunctionUtil.getFunctionInfo(BuiltinFunctions.OR));
+                        AbstractFunctionCallExpression orFunc =
+                                new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.OR));
                         // is new value missing? -> this means that the expected operation is delete
                         AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression(
                                 FunctionUtil.getFunctionInfo(BuiltinFunctions.IS_MISSING));
@@ -116,11 +105,10 @@
                         orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc));
 
                         // AssignOperator puts in the cast var the casted record
-                        AssignOperator upsertFlagAssign = new AssignOperator(upsertVar,
-                                new MutableObject<ILogicalExpression>(orFunc));
+                        AssignOperator upsertFlagAssign =
+                                new AssignOperator(upsertVar, new MutableObject<ILogicalExpression>(orFunc));
                         // Connect the current top of the plan to the cast operator
-                        upsertFlagAssign.getInputs()
-                                .add(new MutableObject<ILogicalOperator>(eOp.getInputs().get(0).getValue()));
+                        upsertFlagAssign.getInputs().add(new MutableObject<>(eOp.getInputs().get(0).getValue()));
                         eOp.getInputs().clear();
                         eOp.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign));
                         context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign);
@@ -151,8 +139,8 @@
 
         //create the logical and physical operator
         CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
-        CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId,
-                primaryKeyLogicalVars, upsertVar, isSink);
+        CommitPOperator commitPOperator =
+                new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, upsertVar, isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index bc18045..cc12f36 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -25,7 +25,6 @@
 import java.util.Map;
 import java.util.logging.Logger;
 
-import org.apache.asterix.algebra.operators.physical.CommitRuntime;
 import org.apache.asterix.app.external.TestLibrarian;
 import org.apache.asterix.app.nc.NCAppRuntimeContext;
 import org.apache.asterix.app.nc.TransactionSubsystem;
@@ -57,6 +56,7 @@
 import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
+import org.apache.asterix.transaction.management.runtime.CommitRuntime;
 import org.apache.asterix.transaction.management.service.logging.LogReader;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 1f1c139..f8755b4 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -18,6 +18,12 @@
  */
 package org.apache.asterix.external.library.java;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
@@ -87,12 +93,6 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.string.UTF8StringReader;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.List;
-
 public class JObjectAccessors {
 
     public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) {
@@ -455,7 +455,7 @@
             this.typeInfo = new TypeInfo(objectPool, null, null);
             this.jObjects = new IJObject[recordType.getFieldNames().length];
             this.jRecord = new JRecord(recordType, jObjects);
-            this.openFields = new LinkedHashMap<String, IJObject>();
+            this.openFields = new LinkedHashMap<>();
         }
 
         @Override
@@ -473,12 +473,11 @@
                 for (IVisitablePointable fieldPointable : fieldPointables) {
                     closedPart = index < recordType.getFieldTypes().length;
                     IVisitablePointable tt = fieldTypeTags.get(index);
-                    ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
-                            .deserialize(tt.getByteArray()[tt.getStartOffset()]);
+                    ATypeTag typeTag =
+                            EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tt.getByteArray()[tt.getStartOffset()]);
                     IAType fieldType;
-                    fieldType = closedPart ?
-                            recordType.getFieldTypes()[index] :
-                            TypeTagUtil.getBuiltinTypeByTag(typeTag);
+                    fieldType =
+                            closedPart ? recordType.getFieldTypes()[index] : TypeTagUtil.getBuiltinTypeByTag(typeTag);
                     IVisitablePointable fieldName = fieldNames.get(index);
                     typeInfo.reset(fieldType, typeTag);
                     switch (typeTag) {
@@ -491,8 +490,8 @@
                                 // value is null
                                 fieldObject = null;
                             } else {
-                                fieldObject = pointableVisitor
-                                        .visit((AListVisitablePointable) fieldPointable, typeInfo);
+                                fieldObject =
+                                        pointableVisitor.visit((AListVisitablePointable) fieldPointable, typeInfo);
                             }
                             break;
                         case ANY:
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 4ebf055..55cd304 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -66,11 +67,13 @@
 import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
+import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
@@ -241,13 +244,8 @@
             return false;
         }
         Dataset otherDataset = (Dataset) other;
-        if (!otherDataset.dataverseName.equals(dataverseName)) {
-            return false;
-        }
-        if (!otherDataset.datasetName.equals(datasetName)) {
-            return false;
-        }
-        return true;
+        return Objects.equals(dataverseName, otherDataset.dataverseName)
+                && Objects.equals(datasetName, otherDataset.datasetName);
     }
 
     public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR: this method is meant to be extended
@@ -567,10 +565,13 @@
 
     @Override
     public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((datasetName == null) ? 0 : datasetName.hashCode());
-        result = prime * result + ((dataverseName == null) ? 0 : dataverseName.hashCode());
-        return result;
+        return Objects.hash(dataverseName, datasetName);
+    }
+
+    public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] primaryKeyFields,
+            MetadataProvider metadataProvider, int upsertVarIdx, int[] datasetPartitions, boolean isSink) {
+        return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
+                metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx,
+                datasetPartitions, isSink);
     }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
index 511ea9f..0a2f166 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@
     private static final long serialVersionUID = 1L;
     public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
-            .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
index 666fa0a..2a878ac 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java
@@ -29,8 +29,8 @@
     private static final long serialVersionUID = 1L;
     public static final ShortWithoutTypeInfoPrinterFactory INSTANCE = new ShortWithoutTypeInfoPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
-            .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
index 4aa6ccd..c500e86 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@
     private static final long serialVersionUID = 1L;
     public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
-            .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
index ebc09a0..aa6fcbe 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@
     private static final long serialVersionUID = 1L;
     public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps
-            .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
index b65897b..959c4ad 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java
@@ -29,8 +29,8 @@
     private static final long serialVersionUID = 1L;
     public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory();
 
-    public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) ->
-        ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
+    public static final IPrinter PRINTER =
+            (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1));
 
     @Override
     public IPrinter createPrinter() {
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
index 227c2cd..7d6a078 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java
@@ -29,7 +29,6 @@
 public class ABooleanSerializerDeserializer implements ISerializerDeserializer<ABoolean> {
 
     private static final long serialVersionUID = 1L;
-
     public static final ABooleanSerializerDeserializer INSTANCE = new ABooleanSerializerDeserializer();
 
     private ABooleanSerializerDeserializer() {
@@ -54,11 +53,6 @@
     }
 
     public static boolean getBoolean(byte[] bytes, int offset) {
-        if (bytes[offset] == 0) {
-            return false;
-        } else {
-            return true;
-        }
+        return bytes[offset] != 0;
     }
-
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
index d9cfc67..ef727c9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java
@@ -22,6 +22,7 @@
 import java.io.IOException;
 
 import org.apache.asterix.builders.OrderedListBuilder;
+import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.BuiltinType;
@@ -31,7 +32,6 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.BooleanPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -75,7 +75,7 @@
         int gramLength = ATypeHierarchy.getIntegerValue(BuiltinFunctions.GRAM_TOKENS.getName(), 1,
                 gramLengthArg.getByteArray(), gramLengthArg.getStartOffset());
         tokenizer.setGramlength(gramLength);
-        boolean prePost = BooleanPointable.getBoolean(prePostArg.getByteArray(),
+        boolean prePost = ABooleanSerializerDeserializer.getBoolean(prePostArg.getByteArray(),
                 prePostArg.getStartOffset() + typeIndicatorSize);
         tokenizer.setPrePost(prePost);
         tokenizer.reset(stringArg.getByteArray(), stringArg.getStartOffset(), stringArg.getLength());
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
index 9fd5dc4..e9f9c9e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java
@@ -74,14 +74,14 @@
 
                 return new IScalarEvaluator() {
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+                    private ISerializerDeserializer<ABoolean> booleanSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
+                    private ISerializerDeserializer<ANull> nullSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AMISSING);
+                    private ISerializerDeserializer<AMissing> missingSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
 
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -111,7 +111,7 @@
                                         ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
                             }
                             boolean argResult = ABooleanSerializerDeserializer.getBoolean(bytes, offset + 1);
-                            if (! argResult) {
+                            if (!argResult) {
                                 // anything AND FALSE = FALSE
                                 booleanSerde.serialize(ABoolean.FALSE, out);
                                 result.set(resultStorage);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
index f6c8c4f..0509f51 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java
@@ -22,6 +22,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.functions.BuiltinFunctions;
@@ -35,7 +36,6 @@
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.BooleanPointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
@@ -56,8 +56,8 @@
     protected final IScalarEvaluator usePrePostEval;
 
     @SuppressWarnings("unchecked")
-    private final ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
-            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+    private final ISerializerDeserializer<ABoolean> booleanSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
 
     private final UTF8StringPointable utf8Ptr = new UTF8StringPointable();
 
@@ -88,14 +88,12 @@
         int strLen = utf8Ptr.getStringLength();
 
         // Check type and extract edit-distance threshold.
-        long edThresh = ATypeHierarchy.getIntegerValue(
-                BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 1, edThreshPtr.getByteArray(),
-                edThreshPtr.getStartOffset());
+        long edThresh = ATypeHierarchy.getIntegerValue(BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 1,
+                edThreshPtr.getByteArray(), edThreshPtr.getStartOffset());
 
         // Check type and extract gram length.
-        long gramLen = ATypeHierarchy.getIntegerValue(
-                BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 2, gramLenPtr.getByteArray(),
-                gramLenPtr.getStartOffset());
+        long gramLen = ATypeHierarchy.getIntegerValue(BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 2,
+                gramLenPtr.getByteArray(), gramLenPtr.getStartOffset());
 
         // Check type and extract usePrePost flag.
         typeTag = usePrePostPtr.getByteArray()[usePrePostPtr.getStartOffset()];
@@ -103,7 +101,7 @@
             throw new TypeMismatchException(BuiltinFunctions.EDIT_DISTANCE_STRING_IS_FILTERABLE, 3, typeTag,
                     ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
         }
-        boolean usePrePost = BooleanPointable.getBoolean(usePrePostPtr.getByteArray(),
+        boolean usePrePost = ABooleanSerializerDeserializer.getBoolean(usePrePostPtr.getByteArray(),
                 usePrePostPtr.getStartOffset() + 1);
 
         // Compute result.
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
index 32263ea..13037a9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java
@@ -71,8 +71,8 @@
                     private IScalarEvaluator eval = args[0].createScalarEvaluator(ctx);
 
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+                    private ISerializerDeserializer<ABoolean> booleanSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
 
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -86,8 +86,8 @@
                             ABoolean aResult = argRes ? ABoolean.FALSE : ABoolean.TRUE;
                             booleanSerde.serialize(aResult, out);
                         } else {
-                            throw new TypeMismatchException(getIdentifier(), 0,
-                                    bytes[offset], ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+                            throw new TypeMismatchException(getIdentifier(), 0, bytes[offset],
+                                    ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
                         }
                         result.set(resultStorage);
                     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
index c7a608a..7aea25c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java
@@ -74,14 +74,14 @@
                     private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
                     private DataOutput output = resultStorage.getDataOutput();
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ABOOLEAN);
+                    private ISerializerDeserializer<ABoolean> booleanSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.ANULL);
+                    private ISerializerDeserializer<ANull> nullSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
                     @SuppressWarnings("unchecked")
-                    private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE
-                            .getSerializerDeserializer(BuiltinType.AMISSING);
+                    private ISerializerDeserializer<AMissing> missingSerde =
+                            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING);
 
                     @Override
                     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
@@ -107,7 +107,7 @@
                                         ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
                             }
                             boolean argResult = ABooleanSerializerDeserializer.getBoolean(data, offset + 1);
-                            if (argResult == true) {
+                            if (argResult) {
                                 // anything OR TRUE = TRUE
                                 booleanSerde.serialize(ABoolean.TRUE, output);
                                 result.set(resultStorage);
diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml
index 794595f..a65a436 100644
--- a/asterixdb/asterix-transactions/pom.xml
+++ b/asterixdb/asterix-transactions/pom.xml
@@ -24,7 +24,6 @@
     <version>0.9.1-SNAPSHOT</version>
   </parent>
   <artifactId>asterix-transactions</artifactId>
-
   <licenses>
     <license>
       <name>Apache License, Version 2.0</name>
@@ -33,11 +32,9 @@
       <comments>A business-friendly OSS license</comments>
     </license>
   </licenses>
-
   <properties>
     <appendedResourcesDirectory>${basedir}/../src/main/appended-resources</appendedResourcesDirectory>
   </properties>
-
   <build>
     <plugins>
       <plugin>
@@ -83,9 +80,7 @@
         </executions>
       </plugin>
     </plugins>
-
   </build>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.hyracks</groupId>
@@ -111,6 +106,11 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-om</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
@@ -149,6 +149,13 @@
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-am-bloomfilter</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-runtime</artifactId>
+    </dependency>
   </dependencies>
-
-</project>
+</project>
\ No newline at end of file
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
similarity index 95%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
rename to asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 63a91ac..d38c5b7 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.asterix.algebra.operators.physical;
+package org.apache.asterix.transaction.management.runtime;
 
 import java.nio.ByteBuffer;
 
@@ -86,7 +86,7 @@
             transactionContext = transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
             ILogMarkerCallback callback =
-                    TaskUtil.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+                    TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
             logRecord = new LogRecord(callback);
             if (isSink) {
                 return;
@@ -126,7 +126,7 @@
                 }
             }
         }
-        VSizeFrame message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+        VSizeFrame message = TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx);
         if (message != null
                 && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) {
             try {
@@ -183,5 +183,6 @@
 
     @Override
     public void flush() throws HyracksDataException {
+        // Commit is at the end of a modification pipeline and there is no need to flush
     }
 }
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
similarity index 97%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
rename to asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 767d864..536e657 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.asterix.algebra.operators.physical;
+package org.apache.asterix.transaction.management.runtime;
 
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
similarity index 97%
rename from asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
rename to asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
index 53e0f62..9b2fe36 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.algebra.operators.physical;
+package org.apache.asterix.transaction.management.runtime;
 
 import java.nio.ByteBuffer;
 
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
index db0b483..3df27ad 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java
@@ -55,7 +55,7 @@
     };
 
     public static boolean getBoolean(byte[] bytes, int start) {
-        return bytes[start] == 0 ? false : true;
+        return bytes[start] != 0;
     }
 
     public static void setBoolean(byte[] bytes, int start, boolean value) {