Code clean up and sonar fixes.

Change-Id: I55e2fbca5480ede8ff6bf47e843ca15c3d9d26d4
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1157
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <tillw@apache.org>
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
index 1bbd745..034cfeb 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/FunctionManagerImpl.java
@@ -35,12 +35,12 @@
     private final Map<Pair<FunctionIdentifier, Integer>, IFunctionDescriptorFactory> functions;
 
     public FunctionManagerImpl() {
-        functions = new HashMap<Pair<FunctionIdentifier, Integer>, IFunctionDescriptorFactory>();
+        functions = new HashMap<>();
     }
 
     @Override
     public synchronized IFunctionDescriptor lookupFunction(FunctionIdentifier fid) throws AlgebricksException {
-        Pair<FunctionIdentifier, Integer> key = new Pair<FunctionIdentifier, Integer>(fid, fid.getArity());
+        Pair<FunctionIdentifier, Integer> key = new Pair<>(fid, fid.getArity());
         IFunctionDescriptorFactory factory = functions.get(key);
         if (factory == null) {
             throw new AlgebricksException("Inappropriate use of function " + "'" + fid.getName() + "'");
@@ -58,7 +58,7 @@
     public synchronized void unregisterFunction(IFunctionDescriptorFactory descriptorFactory)
             throws AlgebricksException {
         FunctionIdentifier fid = descriptorFactory.createFunctionDescriptor().getIdentifier();
-        Pair<FunctionIdentifier, Integer> key = new Pair<FunctionIdentifier, Integer>(fid, fid.getArity());
+        Pair<FunctionIdentifier, Integer> key = new Pair<>(fid, fid.getArity());
         functions.remove(key);
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
index c42865c..b053d99 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/temporal/CalendarDurationFromDateTimeDescriptor.java
@@ -63,9 +63,9 @@
  * <p/>
  */
 public class CalendarDurationFromDateTimeDescriptor extends AbstractScalarFunctionDynamicDescriptor {
-    private final static long serialVersionUID = 1L;
-    public final static FunctionIdentifier FID = AsterixBuiltinFunctions.CALENDAR_DURATION_FROM_DATETIME;
-    public final static IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+    private static final long serialVersionUID = 1L;
+    public static final FunctionIdentifier FID = AsterixBuiltinFunctions.CALENDAR_DURATION_FROM_DATETIME;
+    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
 
         @Override
         public IFunctionDescriptor createFunctionDescriptor() {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
index afcb86e..74c1ba5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/functions/FunctionIdentifier.java
@@ -27,7 +27,7 @@
     private final String name;
     private final int arity;
 
-    public final static int VARARGS = -1;
+    public static final int VARARGS = -1;
 
     public FunctionIdentifier(String namespace, String name) {
         this(namespace, name, VARARGS);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 34c707b..91dba24 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -100,7 +100,7 @@
         }
         ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
         IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
-        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+        return new Pair<>(conn, null);
     }
 
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
index 87ea627..e2643dd 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorPropertiesUtil.java
@@ -45,6 +45,9 @@
 
     private static final String MOVABLE = "isMovable";
 
+    private OperatorPropertiesUtil() {
+    }
+
     public static <T> boolean disjoint(Collection<T> c1, Collection<T> c2) {
         for (T m : c1) {
             if (c2.contains(m)) {
@@ -58,7 +61,7 @@
     private static void getFreeVariablesInOp(ILogicalOperator op, Set<LogicalVariable> freeVars)
             throws AlgebricksException {
         VariableUtilities.getUsedVariables(op, freeVars);
-        HashSet<LogicalVariable> produced = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> produced = new HashSet<>();
         VariableUtilities.getProducedVariables(op, produced);
         for (LogicalVariable v : produced) {
             freeVars.remove(v);
@@ -75,13 +78,13 @@
      */
     public static void getFreeVariablesInSelfOrDesc(AbstractLogicalOperator op, Set<LogicalVariable> freeVars)
             throws AlgebricksException {
-        HashSet<LogicalVariable> produced = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> produced = new HashSet<>();
         VariableUtilities.getProducedVariables(op, produced);
         for (LogicalVariable v : produced) {
             freeVars.remove(v);
         }
 
-        HashSet<LogicalVariable> used = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> used = new HashSet<>();
         VariableUtilities.getUsedVariables(op, used);
         for (LogicalVariable v : used) {
             freeVars.add(v);
@@ -108,7 +111,7 @@
      */
     public static void getFreeVariablesInPath(ILogicalOperator op, ILogicalOperator dest, Set<LogicalVariable> freeVars)
             throws AlgebricksException {
-        Set<LogicalVariable> producedVars = new ListSet<LogicalVariable>();
+        Set<LogicalVariable> producedVars = new ListSet<>();
         VariableUtilities.getLiveVariables(op, freeVars);
         collectUsedAndProducedVariablesInPath(op, dest, freeVars, producedVars);
         freeVars.removeAll(producedVars);
@@ -163,13 +166,13 @@
     }
 
     public static boolean hasFreeVariablesInSelfOrDesc(AbstractLogicalOperator op) throws AlgebricksException {
-        HashSet<LogicalVariable> free = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> free = new HashSet<>();
         getFreeVariablesInSelfOrDesc(op, free);
         return !free.isEmpty();
     }
 
     public static boolean hasFreeVariables(ILogicalOperator op) throws AlgebricksException {
-        HashSet<LogicalVariable> free = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> free = new HashSet<>();
         getFreeVariablesInOp(op, free);
         return !free.isEmpty();
     }
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 93c9b77..159bcf5 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -83,6 +83,7 @@
 import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -153,8 +154,8 @@
     }
 
     // Gets the index of a child to start top-down data property enforcement.
-    // If there is a partitioning-compatible child with the operator in opRef, start from this child;
-    // otherwise, start from child zero.
+    // If there is a partitioning-compatible child with the operator in opRef,
+    // start from this child; otherwise, start from child zero.
     private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
             IOptimizationContext context) throws AlgebricksException {
         IPhysicalPropertiesVector[] reqdProperties = null;
@@ -366,14 +367,15 @@
                 preSortedDistinct.setDistinctByColumns(d.getDistinctByVarList());
                 break;
             }
+            default:
         }
     }
 
     private List<OrderColumn> getOrderColumnsFromGroupingProperties(List<ILocalStructuralProperty> reqd,
             List<ILocalStructuralProperty> dlvd) {
-        List<OrderColumn> returnedProperties = new ArrayList<OrderColumn>();
-        List<LogicalVariable> rqdCols = new ArrayList<LogicalVariable>();
-        List<LogicalVariable> dlvdCols = new ArrayList<LogicalVariable>();
+        List<OrderColumn> returnedProperties = new ArrayList<>();
+        List<LogicalVariable> rqdCols = new ArrayList<>();
+        List<LogicalVariable> dlvdCols = new ArrayList<>();
         for (ILocalStructuralProperty r : reqd) {
             r.getVariables(rqdCols);
         }
@@ -382,7 +384,7 @@
         }
 
         int prefix = dlvdCols.size() - 1;
-        for (; prefix >= 0;) {
+        while (prefix >= 0) {
             if (!rqdCols.contains(dlvdCols.get(prefix))) {
                 prefix--;
             } else {
@@ -396,7 +398,7 @@
             returnedProperties.add(new OrderColumn(orderColumns.get(j).getColumn(), orderColumns.get(j).getOrder()));
         }
         // maintain other order columns after the required order columns
-        if (returnedProperties.size() != 0) {
+        if (!returnedProperties.isEmpty()) {
             for (int j = prefix + 1; j < dlvdCols.size(); j++) {
                 OrderColumn oc = orderColumns.get(j);
                 returnedProperties.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
@@ -460,9 +462,9 @@
             return;
         }
 
-        Mutable<ILogicalOperator> topOp = new MutableObject<ILogicalOperator>();
+        Mutable<ILogicalOperator> topOp = new MutableObject<>();
         topOp.setValue(op.getInputs().get(i).getValue());
-        LinkedList<LocalOrderProperty> oList = new LinkedList<LocalOrderProperty>();
+        LinkedList<LocalOrderProperty> oList = new LinkedList<>();
 
         for (ILocalStructuralProperty prop : localProperties) {
             switch (prop.getPropertyType()) {
@@ -474,7 +476,7 @@
                     LocalGroupingProperty g = (LocalGroupingProperty) prop;
                     Collection<LogicalVariable> vars =
                             (g.getPreferredOrderEnforcer() != null) ? g.getPreferredOrderEnforcer() : g.getColumnSet();
-                    List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+                    List<OrderColumn> orderColumns = new ArrayList<>();
                     for (LogicalVariable v : vars) {
                         OrderColumn oc = new OrderColumn(v, OrderKind.ASC);
                         orderColumns.add(oc);
@@ -494,6 +496,7 @@
 
         op.getInputs().set(i, topOp);
         OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) topOp.getValue(), context);
+        OperatorManipulationUtil.setOperatorMode(op);
         printOp((AbstractLogicalOperator) topOp.getValue());
     }
 
@@ -504,7 +507,7 @@
         for (LocalOrderProperty orderProperty : oList) {
             for (OrderColumn oc : orderProperty.getOrderColumns()) {
                 IOrder ordType = (oc.getOrder() == OrderKind.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
-                Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<IOrder, Mutable<ILogicalExpression>>(ordType,
+                Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<>(ordType,
                         new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oc.getColumn())));
                 oe.add(pair);
             }
@@ -532,7 +535,7 @@
             switch (pp.getPartitioningType()) {
                 case UNPARTITIONED: {
                     List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild);
-                    if (ordCols == null || ordCols.size() == 0) {
+                    if (ordCols.isEmpty()) {
                         pop = new RandomMergeExchangePOperator();
                     } else {
                         if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
@@ -620,17 +623,17 @@
     }
 
     private List<OrderColumn> computeOrderColumns(IPhysicalPropertiesVector pv) {
-        List<OrderColumn> ordCols = new ArrayList<OrderColumn>();
+        List<OrderColumn> ordCols = new ArrayList<>();
         List<ILocalStructuralProperty> localProps = pv.getLocalProperties();
-        if (localProps == null || localProps.size() == 0) {
-            return null;
+        if (localProps == null || localProps.isEmpty()) {
+            return new ArrayList<>();
         } else {
             for (ILocalStructuralProperty p : localProps) {
                 if (p.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
                     LocalOrderProperty lop = (LocalOrderProperty) p;
                     ordCols.addAll(lop.getOrderColumns());
                 } else {
-                    return null;
+                    return new ArrayList<>();
                 }
             }
             return ordCols;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
index 6bc129e..c670b6b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/IntroduceProjectsRule.java
@@ -48,10 +48,10 @@
  */
 public class IntroduceProjectsRule implements IAlgebraicRewriteRule {
 
-    private final Set<LogicalVariable> usedVars = new HashSet<LogicalVariable>();
-    private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
-    private final Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
-    private final List<LogicalVariable> projectVars = new ArrayList<LogicalVariable>();
+    private final Set<LogicalVariable> usedVars = new HashSet<>();
+    private final Set<LogicalVariable> liveVars = new HashSet<>();
+    private final Set<LogicalVariable> producedVars = new HashSet<>();
+    private final List<LogicalVariable> projectVars = new ArrayList<>();
     protected boolean hasRun = false;
 
     @Override
@@ -78,7 +78,7 @@
         VariableUtilities.getUsedVariables(op, usedVars);
 
         // In the top-down pass, maintain a set of variables that are used in op and all its parents.
-        HashSet<LogicalVariable> parentsUsedVars = new HashSet<LogicalVariable>();
+        HashSet<LogicalVariable> parentsUsedVars = new HashSet<>();
         parentsUsedVars.addAll(parentUsedVars);
         parentsUsedVars.addAll(usedVars);
 
@@ -115,7 +115,7 @@
                 ILogicalOperator childOp = op.getInputs().get(i).getValue();
                 liveVars.clear();
                 VariableUtilities.getLiveVariables(childOp, liveVars);
-                List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+                List<LogicalVariable> vars = new ArrayList<>();
                 vars.addAll(projectVars);
                 // Only retain those variables that are live in the i-th input branch.
                 vars.retainAll(liveVars);
@@ -132,8 +132,8 @@
             liveVars.clear();
             VariableUtilities.getLiveVariables(op.getInputs().get(0).getValue(), liveVars);
             ProjectOperator projectOp = (ProjectOperator) op;
-            List<LogicalVariable> projectVars = projectOp.getVariables();
-            if (liveVars.size() == projectVars.size() && liveVars.containsAll(projectVars)) {
+            List<LogicalVariable> projectVarsTemp = projectOp.getVariables();
+            if (liveVars.size() == projectVarsTemp.size() && liveVars.containsAll(projectVarsTemp)) {
                 boolean eliminateProject = true;
                 // For UnionAll the variables must also be in exactly the correct order.
                 if (parentOp.getOperatorTag() == LogicalOperatorTag.UNIONALL) {
@@ -155,7 +155,7 @@
 
     private boolean canEliminateProjectBelowUnion(UnionAllOperator unionOp, ProjectOperator projectOp,
             int unionInputIndex) throws AlgebricksException {
-        List<LogicalVariable> orderedLiveVars = new ArrayList<LogicalVariable>();
+        List<LogicalVariable> orderedLiveVars = new ArrayList<>();
         VariableUtilities.getLiveVariables(projectOp.getInputs().get(0).getValue(), orderedLiveVars);
         int numVars = orderedLiveVars.size();
         for (int i = 0; i < numVars; i++) {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
index c193c3b..1ed34f6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManager.java
@@ -35,10 +35,16 @@
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
 
 /**
- * This buffer manager will dived the buffers into given number of partitions.
+ * This buffer manager will divide the buffers into given number of partitions.
  * The cleared partition (spilled one in the caller side) can only get no more than one frame.
  */
 public class VPartitionTupleBufferManager implements IPartitionedTupleBufferManager {
+    public static final IPartitionedMemoryConstrain NO_CONSTRAIN = new IPartitionedMemoryConstrain() {
+        @Override
+        public int frameLimit(int partitionId) {
+            return Integer.MAX_VALUE;
+        }
+    };
 
     private IDeallocatableFramePool framePool;
     private IFrameBufferManager[] partitionArray;
@@ -102,8 +108,8 @@
             for (int i = 0; i < partition.getNumFrames(); ++i) {
                 framePool.deAllocateBuffer(partition.getFrame(i, tempInfo).getBuffer());
             }
+            partition.reset();
         }
-        partitionArray[partitionId].reset();
         numTuples[partitionId] = 0;
     }
 
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
index e229a3c..4ed11e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
@@ -36,9 +36,9 @@
  */
 public class VariableDeletableTupleMemoryManager implements IDeletableTupleBufferManager {
 
-    private final static Logger LOG = Logger.getLogger(VariableDeletableTupleMemoryManager.class.getName());
+    private static final Logger LOG = Logger.getLogger(VariableDeletableTupleMemoryManager.class.getName());
 
-    private final int MIN_FREE_SPACE;
+    private final int minFreeSpace;
     private final IFramePool pool;
     private final IFrameFreeSlotPolicy policy;
     private final IAppendDeletableFrameTupleAccessor accessor;
@@ -53,7 +53,7 @@
         this.policy = new FrameFreeSlotLastFit(maxFrames);
         this.accessor = new DeletableFrameTupleAppender(recordDescriptor);
         this.frames = new ArrayList<>();
-        this.MIN_FREE_SPACE = calculateMinFreeSpace(recordDescriptor);
+        this.minFreeSpace = calculateMinFreeSpace(recordDescriptor);
         this.recordDescriptor = recordDescriptor;
         this.numTuples = 0;
         this.statsReOrg = 0;
@@ -92,7 +92,7 @@
         int tid = accessor.append(fta, idx);
         assert tid >= 0;
         tuplePointer.reset(frameId, tid);
-        if (accessor.getContiguousFreeSpace() > MIN_FREE_SPACE) {
+        if (accessor.getContiguousFreeSpace() > minFreeSpace) {
             policy.pushNewFrame(frameId, accessor.getContiguousFreeSpace());
         }
         numTuples++;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 09207b9..4fa1498 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -19,9 +19,6 @@
 
 package org.apache.hyracks.dataflow.std.join;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -98,22 +95,9 @@
     public static class JoinCacheTaskState extends AbstractStateObject {
         private NestedLoopJoin joiner;
 
-        public JoinCacheTaskState() {
-        }
-
         private JoinCacheTaskState(JobId jobId, TaskId taskId) {
             super(jobId, taskId);
         }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
     }
 
     private class JoinCacheActivityNode extends AbstractActivityNode {
@@ -132,8 +116,8 @@
             final RecordDescriptor rd0 = recordDescProvider.getInputRecordDescriptor(nljAid, 0);
             final RecordDescriptor rd1 = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final ITuplePairComparator comparator = comparatorFactory.createTuplePairComparator(ctx);
-            final IPredicateEvaluator predEvaluator = ((predEvaluatorFactory != null)
-                    ? predEvaluatorFactory.createPredicateEvaluator() : null);
+            final IPredicateEvaluator predEvaluator = (predEvaluatorFactory != null)
+                    ? predEvaluatorFactory.createPredicateEvaluator() : null;
 
             final IMissingWriter[] nullWriters1 = isLeftOuter ? new IMissingWriter[nullWriterFactories1.length] : null;
             if (isLeftOuter) {
@@ -142,7 +126,7 @@
                 }
             }
 
-            IOperatorNodePushable op = new AbstractUnaryInputSinkOperatorNodePushable() {
+            return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private JoinCacheTaskState state;
 
                 @Override
@@ -170,9 +154,9 @@
 
                 @Override
                 public void fail() throws HyracksDataException {
+                    // No variables to update.
                 }
             };
-            return op;
         }
     }
 
@@ -186,8 +170,7 @@
         @Override
         public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
                 IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-
-            IOperatorNodePushable op = new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
+            return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
                 private JoinCacheTaskState state;
 
                 @Override
@@ -216,7 +199,6 @@
                     writer.fail();
                 }
             };
-            return op;
         }
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
index e032e6a..6d9d085 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractExternalSortRunMerger.java
@@ -49,7 +49,6 @@
     private final INormalizedKeyComputer nmkComputer;
     private final RecordDescriptor recordDesc;
     private final int framesLimit;
-    private final int MAX_FRAME_SIZE;
     private final int topK;
     private List<GroupVSizeFrame> inFrames;
     private VSizeFrame outputFrame;
@@ -75,14 +74,13 @@
         this.recordDesc = recordDesc;
         this.framesLimit = framesLimit;
         this.writer = writer;
-        this.MAX_FRAME_SIZE = FrameConstants.MAX_FRAMESIZE;
         this.topK = topK;
     }
 
     public void process() throws HyracksDataException {
         IFrameWriter finalWriter = null;
         try {
-            if (runs.size() <= 0) {
+            if (runs.isEmpty()) {
                 finalWriter = prepareSkipMergingFinalResultWriter(writer);
                 finalWriter.open();
                 if (sorter != null) {
@@ -169,9 +167,10 @@
         }
     }
 
-    private static int selectPartialRuns(int budget, List<GeneratedRunFileReader> runs,
+    private static int selectPartialRuns(int argBudget, List<GeneratedRunFileReader> runs,
             List<GeneratedRunFileReader> partialRuns, BitSet runAvailable, int stop) {
         partialRuns.clear();
+        int budget = argBudget;
         int maxFrameSizeOfGenRun = 0;
         int nextRunId = runAvailable.nextSetBit(0);
         while (budget > 0 && nextRunId >= 0 && nextRunId < stop) {
@@ -192,13 +191,14 @@
         if (extraFreeMem > 0 && partialRuns.size() > 1) {
             int extraFrames = extraFreeMem / ctx.getInitialFrameSize();
             int avg = (extraFrames / partialRuns.size()) * ctx.getInitialFrameSize();
-            int residue = (extraFrames % partialRuns.size());
+            int residue = extraFrames % partialRuns.size();
             for (int i = 0; i < residue; i++) {
-                partialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE,
+                partialRuns.get(i).updateSize(Math.min(FrameConstants.MAX_FRAMESIZE,
                         partialRuns.get(i).getMaxFrameSize() + avg + ctx.getInitialFrameSize()));
             }
             for (int i = residue; i < partialRuns.size() && avg > 0; i++) {
-                partialRuns.get(i).updateSize(Math.min(MAX_FRAME_SIZE, partialRuns.get(i).getMaxFrameSize() + avg));
+                partialRuns.get(i)
+                        .updateSize(Math.min(FrameConstants.MAX_FRAMESIZE, partialRuns.get(i).getMaxFrameSize() + avg));
             }
         }
 
@@ -214,17 +214,17 @@
         }
     }
 
-    abstract protected IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
+    protected abstract IFrameWriter prepareSkipMergingFinalResultWriter(IFrameWriter nextWriter)
             throws HyracksDataException;
 
-    abstract protected RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException;
+    protected abstract RunFileWriter prepareIntermediateMergeRunFile() throws HyracksDataException;
 
-    abstract protected IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
+    protected abstract IFrameWriter prepareIntermediateMergeResultWriter(RunFileWriter mergeFileWriter)
             throws HyracksDataException;
 
-    abstract protected IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException;
+    protected abstract IFrameWriter prepareFinalMergeResultWriter(IFrameWriter nextWriter) throws HyracksDataException;
 
-    abstract protected int[] getSortFields();
+    protected abstract int[] getSortFields();
 
     private void merge(IFrameWriter writer, List<GeneratedRunFileReader> partialRuns) throws HyracksDataException {
         RunMergingFrameReader merger = new RunMergingFrameReader(ctx, partialRuns, inFrames, getSortFields(),
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
index a6686cb..77d5d49 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractFrameSorter.java
@@ -36,10 +36,10 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.util.IntSerDeUtils;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.buffermanager.BufferInfo;
 import org.apache.hyracks.dataflow.std.buffermanager.IFrameBufferManager;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.util.IntSerDeUtils;
 
 public abstract class AbstractFrameSorter implements IFrameSorter {
 
@@ -185,13 +185,13 @@
     }
 
     protected final int compare(int tp1, int tp2) throws HyracksDataException {
-        int i1 = tPointers[tp1 * 4];
-        int j1 = tPointers[tp1 * 4 + 1];
-        int v1 = tPointers[tp1 * 4 + 3];
+        int i1 = tPointers[tp1 * 4 + ID_FRAMEID];
+        int j1 = tPointers[tp1 * 4 + ID_TUPLE_START];
+        int v1 = tPointers[tp1 * 4 + ID_NORMAL_KEY];
 
-        int tp2i = tPointers[tp2 * 4];
-        int tp2j = tPointers[tp2 * 4 + 1];
-        int tp2v = tPointers[tp2 * 4 + 3];
+        int tp2i = tPointers[tp2 * 4 + ID_FRAMEID];
+        int tp2j = tPointers[tp2 * 4 + ID_TUPLE_START];
+        int tp2v = tPointers[tp2 * 4 + ID_NORMAL_KEY];
 
         if (v1 != tp2v) {
             return ((((long) v1) & 0xffffffffL) < (((long) tp2v) & 0xffffffffL)) ? -1 : 1;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index 665a90b..273d5ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -19,9 +19,6 @@
 
 package org.apache.hyracks.dataflow.std.sort;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.logging.Level;
@@ -99,16 +96,6 @@
         public SortTaskState(JobId jobId, TaskId taskId) {
             super(jobId, taskId);
         }
-
-        @Override
-        public void toBytes(DataOutput out) throws IOException {
-
-        }
-
-        @Override
-        public void fromBytes(DataInput in) throws IOException {
-
-        }
     }
 
     protected abstract class SortActivity extends AbstractActivityNode {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java
index c74fe04..44bda73 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/util/FrameTuplePairComparator.java
@@ -53,8 +53,8 @@
             int fEnd1 = accessor1.getFieldEndOffset(tIndex1, fIdx1);
             int fLen1 = fEnd1 - fStart1;
 
-            int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, accessor1
-                    .getBuffer().array(), fStart1 + fStartOffset1, fLen1);
+            int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0,
+                    accessor1.getBuffer().array(), fStart1 + fStartOffset1, fLen1);
             if (c != 0) {
                 return c;
             }
@@ -76,8 +76,8 @@
             int fStart1 = bufferAccessor.getAbsFieldStartOffset(keys1[i]);
             int fLen1 = bufferAccessor.getFieldLength(keys1[i]);
 
-            int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0, bufferAccessor
-                    .getBuffer().array(), fStart1, fLen1);
+            int c = comparators[i].compare(accessor0.getBuffer().array(), fStart0 + fStartOffset0, fLen0,
+                    bufferAccessor.getBuffer().array(), fStart1, fLen1);
             if (c != 0) {
                 return c;
             }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java
index 726b654..7cc6762 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/buffermanager/VPartitionTupleBufferManagerTest.java
@@ -40,14 +40,8 @@
 
     @Before
     public void setup() throws HyracksDataException {
-        IPartitionedMemoryConstrain constrain = new IPartitionedMemoryConstrain() {
-            @Override
-            public int frameLimit(int partitionId) {
-                return Integer.MAX_VALUE;
-            }
-        };
-        bufferManager = new VPartitionTupleBufferManager(Common.commonFrameManager, constrain, PARTITION,
-                Common.BUDGET);
+        bufferManager = new VPartitionTupleBufferManager(Common.commonFrameManager,
+                VPartitionTupleBufferManager.NO_CONSTRAIN, PARTITION, Common.BUDGET);
     }
 
     @Test