added CommitOperator and the related rewrite rule

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization@838 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/CommitOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/CommitOperator.java
index 8fa5091..cd34605 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/CommitOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/CommitOperator.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.asterix.algebra.operators;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java
index e109d22..a46e52a 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitPOperator.java
@@ -1,10 +1,30 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.asterix.algebra.operators.physical;
 
+import java.util.List;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractPhysicalOperator;
@@ -13,16 +33,27 @@
 import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
-import edu.uci.ics.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 
 public class CommitPOperator extends AbstractPhysicalOperator {
 
+    private final List<LogicalVariable> primaryKeyLogicalVars;
+    private final JobId jobId;
+    private final int datasetId;
+
+    public CommitPOperator(JobId jobId, int datasetId, List<LogicalVariable> primaryKeyLogicalVars) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.primaryKeyLogicalVars = primaryKeyLogicalVars;
+    }
+
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.EXTENSION_OPERATOR;
     }
-    
+
     @Override
     public String toString() {
         return "COMMIT";
@@ -45,9 +76,16 @@
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
+
         RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema,
                 context);
-        CommitRuntimeFactory runtime = new CommitRuntimeFactory();
+        int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
+        IVariableTypeEnvironment varTypeEnv = context.getTypeEnvironment(op.getInputs().get(0).getValue());
+        IBinaryHashFunctionFactory[] binaryHashFunctionFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
+                primaryKeyLogicalVars, varTypeEnv, context);
+
+        CommitRuntimeFactory runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
+                binaryHashFunctionFactories);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
index 0662aa1..9f04a0e 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntime.java
@@ -1,36 +1,97 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.asterix.algebra.operators.physical;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.common.context.AsterixAppRuntimeContext;
+import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
+import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.FieldsHashValueGenerator;
+import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
 import edu.uci.ics.hyracks.api.comm.IFrameWriter;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
 import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 
 public class CommitRuntime implements IPushRuntime {
-    
-    private RecordDescriptor inputRecordDesc;
 
-    public CommitRuntime(IHyracksTaskContext ctx) {
-        // TODO Auto-generated constructor stub
+    private final IHyracksTaskContext hyracksTaskCtx;
+    private final ITransactionManager transactionManager;
+    private final JobId jobId;
+    private final DatasetId datasetId;
+    private final int[] primaryKeyFields;
+    private final IBinaryHashFunction[] primaryKeyHashFunctions;
+
+    private TransactionContext transactionContext;
+    private RecordDescriptor inputRecordDesc;
+    private FrameTupleAccessor frameTupleAccessor;
+    private FrameTupleReference frameTupleReference;
+
+    public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields,
+            IBinaryHashFunctionFactory[] binaryHashFunctionFactories) {
+        this.hyracksTaskCtx = ctx;
+        AsterixAppRuntimeContext runtimeCtx = (AsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
+                .getApplicationObject();
+        this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager();
+        this.jobId = jobId;
+        this.datasetId = new DatasetId(datasetId);
+        this.primaryKeyFields = primaryKeyFields;
+        primaryKeyHashFunctions = new IBinaryHashFunction[binaryHashFunctionFactories.length];
+        for (int i = 0; i < binaryHashFunctionFactories.length; ++i) {
+            this.primaryKeyHashFunctions[i] = binaryHashFunctionFactories[i].createBinaryHashFunction();
+        }
+        this.frameTupleReference = new FrameTupleReference();
     }
 
     @Override
     public void open() throws HyracksDataException {
-        // TODO Auto-generated method stub
-        
-        //System.out.println("EEEEEEEEEEEEEEEEEEEEE open\n");
-
+        try {
+            transactionContext = transactionManager.getTransactionContext(jobId);
+        } catch (ACIDException e) {
+            throw new HyracksDataException(e);
+        }
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-        // TODO Auto-generated method stub
-
-        
-        //System.out.println("EEEEEEEEEEEEEEEEEEEEE nextFrame - commit\n");
-
+        int pkHash = 0;
+        frameTupleAccessor.reset(buffer);
+        int nTuple = frameTupleAccessor.getTupleCount();
+        for (int t = 0; t < nTuple; t++) {
+            frameTupleReference.reset(frameTupleAccessor, t);
+            pkHash = FieldsHashValueGenerator.computeFieldsHashValue(frameTupleReference, primaryKeyFields,
+                    primaryKeyHashFunctions);
+            try {
+                transactionManager.commitTransaction(transactionContext, datasetId, pkHash);
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
+        }
     }
 
     @Override
@@ -42,7 +103,6 @@
     @Override
     public void close() throws HyracksDataException {
         // TODO Auto-generated method stub
-        //System.out.println("EEEEEEEEEEEEEEEEEEEEE close\n");
     }
 
     @Override
@@ -53,6 +113,6 @@
     @Override
     public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
         this.inputRecordDesc = recordDescriptor;
+        this.frameTupleAccessor = new FrameTupleAccessor(hyracksTaskCtx.getFrameSize(), recordDescriptor);
     }
-
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java
index aa809b9..df7da97 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/algebra/operators/physical/CommitRuntimeFactory.java
@@ -1,15 +1,47 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package edu.uci.ics.asterix.algebra.operators.physical;
 
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntime;
 import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 
 public class CommitRuntimeFactory implements IPushRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
 
-    public CommitRuntimeFactory() {
+    private final JobId jobId;
+    private final int datasetId;
+    private final int[] primaryKeyFields;
+    IBinaryHashFunctionFactory[] binaryHashFunctionFactories;
+
+    public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+            IBinaryHashFunctionFactory[] binaryHashFunctionFactories) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.primaryKeyFields = primaryKeyFields;
+        this.binaryHashFunctionFactories = binaryHashFunctionFactories;
     }
 
     @Override
@@ -19,6 +51,6 @@
 
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws AlgebricksException {
-        return new CommitRuntime(ctx);
+        return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, binaryHashFunctionFactories);
     }
 }
diff --git a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
index 0970b17..02b7d1b 100644
--- a/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
+++ b/asterix-algebra/src/main/java/edu/uci/ics/asterix/optimizer/rules/ReplaceSinkOpWithCommitOpRule.java
@@ -1,16 +1,23 @@
 package edu.uci.ics.asterix.optimizer.rules;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
 
 import edu.uci.ics.asterix.algebra.operators.CommitOperator;
 import edu.uci.ics.asterix.algebra.operators.physical.CommitPOperator;
+import edu.uci.ics.asterix.metadata.declared.AqlDataSource;
+import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
+import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
@@ -37,29 +44,46 @@
         SinkOperator sinkOperator = (SinkOperator) op;
 
         List<Mutable<ILogicalExpression>> primaryKeyExprs = null;
+        int datasetId = 0;
         AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) sinkOperator.getInputs().get(0).getValue();
         while (descendantOp != null) {
             if (descendantOp.getOperatorTag() == LogicalOperatorTag.INDEX_INSERT_DELETE) {
                 IndexInsertDeleteOperator indexInsertDeleteOperator = (IndexInsertDeleteOperator) descendantOp;
                 primaryKeyExprs = indexInsertDeleteOperator.getPrimaryKeyExpressions();
+                datasetId = ((AqlDataSource) indexInsertDeleteOperator.getDataSourceIndex().getDataSource()).getDataset().getDatasetId();
                 break;
             } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE) {
                 InsertDeleteOperator insertDeleteOperator = (InsertDeleteOperator) descendantOp;
                 primaryKeyExprs = insertDeleteOperator.getPrimaryKeyExpressions();
+                datasetId = ((AqlDataSource) insertDeleteOperator.getDataSource()).getDataset().getDatasetId();
                 break;
             }
             descendantOp = (AbstractLogicalOperator) descendantOp.getInputs().get(0).getValue();
         }
-        
+
         if (primaryKeyExprs != null) {
-            //TODO
-            //set the proper parameters for the constructor of the CommitOperator.
-            //also, set the CommitPOperator, too.
+
+            //copy primaryKeyExprs
+            List<LogicalVariable> primaryKeyLogicalVars = new ArrayList<LogicalVariable>();
+            for (Mutable<ILogicalExpression> expr : primaryKeyExprs) {
+                VariableReferenceExpression varRefExpr = (VariableReferenceExpression)expr.getValue();
+                primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId()));
+            }
+
+            //get JobId(TransactorId)
+            AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
+            JobId jobId = mp.getJobId();
+
+            //create the logical and physical operator
             CommitOperator commitOperator = new CommitOperator();
-            ExtensionOperator extensionOperator = new ExtensionOperator(commitOperator);
-            CommitPOperator commitPOperator = new CommitPOperator();
+            CommitPOperator commitPOperator = new CommitPOperator(jobId, datasetId, primaryKeyLogicalVars);
             commitOperator.setPhysicalOperator(commitPOperator);
+
+            //create ExtensionOperator and put the commitOperator in it.
+            ExtensionOperator extensionOperator = new ExtensionOperator(commitOperator);
             extensionOperator.setPhysicalOperator(commitPOperator);
+            
+            //update plan link
             extensionOperator.getInputs().add(sinkOperator.getInputs().get(0));
             context.computeAndSetTypeEnvironmentForOperator(extensionOperator);
             opRef.setValue(extensionOperator);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
index a907ee3..066a611 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/AbstractOperationCallback.java
@@ -17,6 +17,7 @@
 
 import edu.uci.ics.asterix.transaction.management.service.locking.ILockManager;
 import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId;
+import edu.uci.ics.asterix.transaction.management.service.transaction.FieldsHashValueGenerator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
 import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -40,14 +41,7 @@
 
     public int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields,
             IBinaryHashFunction[] primaryKeyHashFunctions) {
-        int h = 0;
-        for (int i = 0; i < primaryKeyFields.length; i++) {
-            int primaryKeyFieldIdx = primaryKeyFields[i];
-            int fh = primaryKeyHashFunctions[i].hash(tuple.getFieldData(primaryKeyFieldIdx),
-                    tuple.getFieldStart(primaryKeyFieldIdx), tuple.getFieldLength(primaryKeyFieldIdx));
-            h = h * 31 + fh;
-        }
-        return h;
+        return FieldsHashValueGenerator.computeFieldsHashValue(tuple, primaryKeyFields, primaryKeyHashFunctions);
     }
     
     public TransactionContext getTransactionContext() {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index db603da..c7189f9 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -532,7 +532,19 @@
                 throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
             }
         }
+        
+        //////////////////////////////////////////////////////////////
+        //TODO
+        //Check whether the dLockInfo or jobInfo could be null 
+        //even when the callback is called properly
+        if (dLockInfo == null || jobInfo == null) {
+            unlatchLockTable();
+            return;
+        }
+        /////////////////////////////////////////////////////////////
+        
         eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue);
+        
         if (IS_DEBUG_MODE) {
             if (eLockInfo == -1) {
                 throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist.");
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
new file mode 100644
index 0000000..ca5a71e
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009-2012 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.service.transaction;
+
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunction;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class FieldsHashValueGenerator {
+    public static int computeFieldsHashValue(ITupleReference tuple, int[] fieldIndexes,
+            IBinaryHashFunction[] fieldHashFunctions) {
+        int h = 0;
+        for (int i = 0; i < fieldIndexes.length; i++) {
+            int primaryKeyFieldIdx = fieldIndexes[i];
+            int fh = fieldHashFunctions[i].hash(tuple.getFieldData(primaryKeyFieldIdx),
+                    tuple.getFieldStart(primaryKeyFieldIdx), tuple.getFieldLength(primaryKeyFieldIdx));
+            h = h * 31 + fh;
+        }
+        return h;
+    }
+}
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index f537e88..b93425c 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -95,7 +95,7 @@
 
             //for entity-level commit
             if (PKHashVal != -1) {
-                transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext);
+                transactionProvider.getLockManager().unlock(datasetId, PKHashVal, txnContext, true);
                 return;
             }